From bf81b6840dba16b506187c53448db8e09a6a9f14 Mon Sep 17 00:00:00 2001 From: Zhengdai Hu Date: Fri, 20 Jul 2018 13:41:51 -0500 Subject: [PATCH] STORM-3133: Extend metrics on Nimbus and LogViewer: STORM-3157: Added registration method for MetricSet STORM-3133: Refactored and added metrics to LogViewer components STORM-3133: Fixed up Unit test for LogViewer STORM-3133: Refactored and added metrics to Nimbus components. STORM-3133: Add nimbus scheduling metrics STORM-3133: Add metrics for disk usage of workers' logs and performance of LogCleaner routine STORM-3133: Refactored code and added file partial read count metric for logviewer STORM-3133: Add metrics for counting LogViewer's IOExceptions --- .../apache/storm/scheduler/WorkerSlot.java | 7 + .../apache/storm/daemon/nimbus/Nimbus.java | 228 +++++++++++------- .../daemon/supervisor/SupervisorUtils.java | 9 +- .../storm/localizer/LocallyCachedBlob.java | 3 +- .../storm/metric/StormMetricsRegistry.java | 25 +- .../storm/nimbus/LeaderListenerCallback.java | 7 + .../org/apache/storm/scheduler/Cluster.java | 12 +- .../storm/scheduler/ExecutorDetails.java | 9 +- .../metric/StormMetricsRegistryTest.java | 111 +++++++++ .../daemon/logviewer/LogviewerServer.java | 6 +- .../handler/LogviewerLogPageHandler.java | 102 ++++---- .../handler/LogviewerLogSearchHandler.java | 159 +++++++----- .../daemon/logviewer/utils/DeletionMeta.java | 31 +++ .../logviewer/utils/DirectoryCleaner.java | 25 +- .../logviewer/utils/ExceptionMeters.java | 66 +++++ .../daemon/logviewer/utils/LogCleaner.java | 48 ++-- .../logviewer/utils/LogFileDownloader.java | 8 + .../utils/LogviewerResponseBuilder.java | 19 +- .../daemon/logviewer/utils/WorkerLogs.java | 16 +- .../logviewer/webapp/LogviewerResource.java | 78 +++++- .../LogviewerLogSearchHandlerTest.java | 3 +- .../logviewer/utils/LogCleanerTest.java | 9 +- 22 files changed, 722 insertions(+), 259 deletions(-) create mode 100644 storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java create mode 100644 storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java create mode 100644 storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java diff --git a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java index 07064db7fe5..fa963d22f9a 100644 --- a/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java +++ b/storm-client/src/jvm/org/apache/storm/scheduler/WorkerSlot.java @@ -12,6 +12,9 @@ package org.apache.storm.scheduler; +import java.util.Arrays; +import java.util.List; + public class WorkerSlot { private final String nodeId; private final int port; @@ -39,6 +42,10 @@ public String getId() { return getNodeId() + ":" + getPort(); } + public List toList() { + return Arrays.asList(nodeId, (long) port); + } + @Override public int hashCode() { return nodeId.hashCode() + 13 * ((Integer) port).hashCode(); diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index c401f604d1c..a09621731f5 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -18,9 +18,9 @@ package org.apache.storm.daemon.nimbus; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -37,6 +37,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -45,6 +46,7 @@ import java.util.Map.Entry; import java.util.NavigableMap; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -53,6 +55,7 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import javax.security.auth.Subject; + import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.DaemonConfig; @@ -181,6 +184,8 @@ import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; import org.apache.storm.shade.com.google.common.base.Strings; import org.apache.storm.shade.com.google.common.collect.ImmutableMap; +import org.apache.storm.shade.com.google.common.collect.MapDifference; +import org.apache.storm.shade.com.google.common.collect.Maps; import org.apache.storm.shade.org.apache.curator.framework.CuratorFramework; import org.apache.storm.shade.org.apache.zookeeper.ZooDefs; import org.apache.storm.shade.org.apache.zookeeper.data.ACL; @@ -251,10 +256,18 @@ public class Nimbus implements Iface, Shutdownable, DaemonCommon { private static final Meter getTopologyPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getTopologyPageInfo-calls"); private static final Meter getSupervisorPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getSupervisorPageInfo-calls"); private static final Meter getComponentPageInfoCalls = StormMetricsRegistry.registerMeter("nimbus:num-getComponentPageInfo-calls"); - private static final Histogram scheduleTopologyTimeMs = StormMetricsRegistry.registerHistogram("nimbus:time-scheduleTopology-ms", - new ExponentiallyDecayingReservoir()); private static final Meter getOwnerResourceSummariesCalls = StormMetricsRegistry.registerMeter( "nimbus:num-getOwnerResourceSummaries-calls"); + //Timer + private static final Timer fileUploadDuration = StormMetricsRegistry.registerTimer("nimbus:files-upload-duration-ms"); + private static final Timer schedulingDuration = StormMetricsRegistry.registerTimer("nimbus:topology-scheduling-duration-ms"); + //Scheduler histogram + private static final Histogram numAddedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-executors-per-scheduling"); + private static final Histogram numAddedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-added-slots-per-scheduling"); + private static final Histogram numRemovedExecPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-executors-per-scheduling"); + private static final Histogram numRemovedSlotPerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-removed-slots-per-scheduling"); + private static final Histogram numNetExecIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-executors-increase-per-scheduling"); + private static final Histogram numNetSlotIncreasePerScheduling = StormMetricsRegistry.registerHistogram("nimbus:num-net-slots-increase-per-scheduling"); // END Metrics private static final Meter shutdownCalls = StormMetricsRegistry.registerMeter("nimbus:num-shutdown-calls"); private static final Meter processWorkerMetricsCalls = StormMetricsRegistry.registerMeter("nimbus:process-worker-metric-calls"); @@ -411,6 +424,10 @@ public static List getNimbusAcls(Map conf) { private final StormTimer timer; private final IScheduler scheduler; private final IScheduler underlyingScheduler; + //Metrics related + private final AtomicReference schedulingStartTimeNs = new AtomicReference<>(null); + private final AtomicLong longestSchedulingTime = new AtomicLong(); + private final ILeaderElector leaderElector; private final AssignmentDistributionService assignmentsDistributer; private final AtomicReference> idToSchedStatus; @@ -550,6 +567,7 @@ private static TimeCacheMap fileCacheMap(Ma }); } + //Not symmetric difference. Performing A.entrySet() - B.entrySet() private static Map mapDiff(Map first, Map second) { Map ret = new HashMap<>(); for (Entry entry : second.entrySet()) { @@ -689,26 +707,20 @@ private static StormTopology readStormTopologyAsNimbus(String topoId, TopoCache * @return {topology-id -> {executor [node port]}} mapping */ private static Map, List>> computeTopoToExecToNodePort( - Map schedAssignments) { + Map schedAssignments, List assignedTopologyIds) { Map, List>> ret = new HashMap<>(); for (Entry schedEntry : schedAssignments.entrySet()) { Map, List> execToNodePort = new HashMap<>(); for (Entry execAndNodePort : schedEntry.getValue().getExecutorToSlot().entrySet()) { ExecutorDetails exec = execAndNodePort.getKey(); WorkerSlot slot = execAndNodePort.getValue(); - - List listExec = new ArrayList<>(2); - listExec.add((long) exec.getStartTask()); - listExec.add((long) exec.getEndTask()); - - List nodePort = new ArrayList<>(2); - nodePort.add(slot.getNodeId()); - nodePort.add((long) slot.getPort()); - - execToNodePort.put(listExec, nodePort); + execToNodePort.put(exec.toList(), slot.toList()); } ret.put(schedEntry.getKey(), execToNodePort); } + for (String id : assignedTopologyIds) { + ret.putIfAbsent(id, null); + } return ret; } @@ -735,39 +747,95 @@ private static Map> computeTopoToNodePo return ret; } - private static Map, List>> computeNewTopoToExecToNodePort( - Map schedAssignments, Map existingAssignments) { - Map, List>> ret = computeTopoToExecToNodePort(schedAssignments); - // Print some useful information - if (existingAssignments != null && !existingAssignments.isEmpty()) { - for (Entry, List>> entry : ret.entrySet()) { - String topoId = entry.getKey(); - Map, List> execToNodePort = entry.getValue(); - Assignment assignment = existingAssignments.get(topoId); - if (assignment == null) { - continue; + private boolean auditAssignmentChanges(Map existingAssignments, + Map newAssignments) { + assert existingAssignments != null && newAssignments != null; + boolean anyChanged = existingAssignments.isEmpty() ^ newAssignments.isEmpty(); + long numRemovedExec = 0; + long numRemovedSlot = 0; + long numAddedExec = 0; + long numAddedSlot = 0; + if (existingAssignments.isEmpty()) { + for (Entry entry : newAssignments.entrySet()) { + final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Assigning {} to {} slots", entry.getKey(), count); + LOG.info("Assign executors: {}", execToPort.keySet()); + numAddedSlot += count; + numAddedExec += execToPort.size(); + } + } else if (newAssignments.isEmpty()) { + for (Entry entry : existingAssignments.entrySet()) { + final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Removing {} from {} slots", entry.getKey(), count); + LOG.info("Remove executors: {}", execToPort.keySet()); + numRemovedSlot += count; + numRemovedExec += execToPort.size(); + } + } else { + MapDifference difference = Maps.difference(existingAssignments, newAssignments); + if (anyChanged = !difference.areEqual()) { + for (Entry entry : difference.entriesOnlyOnLeft().entrySet()) { + final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Removing {} from {} slots", entry.getKey(), count); + LOG.info("Remove executors: {}", execToPort.keySet()); + numRemovedSlot += count; + numRemovedExec += execToPort.size(); } - Map, NodeInfo> old = assignment.get_executor_node_port(); - Map, List> reassigned = new HashMap<>(); - for (Entry, List> execAndNodePort : execToNodePort.entrySet()) { - NodeInfo oldAssigned = old.get(execAndNodePort.getKey()); - String node = (String) execAndNodePort.getValue().get(0); - Long port = (Long) execAndNodePort.getValue().get(1); - if (oldAssigned == null || !oldAssigned.get_node().equals(node) - || !port.equals(oldAssigned.get_port_iterator().next())) { - reassigned.put(execAndNodePort.getKey(), execAndNodePort.getValue()); - } + for (Entry entry : difference.entriesOnlyOnRight().entrySet()) { + final Map, NodeInfo> execToPort = entry.getValue().get_executor_node_port(); + final long count = new HashSet<>(execToPort.values()).size(); + LOG.info("Assigning {} to {} slots", entry.getKey(), count); + LOG.info("Assign executors: {}", execToPort.keySet()); + numAddedSlot += count; + numAddedExec += execToPort.size(); } + for (Entry> entry : difference.entriesDiffering().entrySet()) { + final Map, NodeInfo> execToSlot = entry.getValue().rightValue().get_executor_node_port(); + final Set slots = new HashSet<>(execToSlot.values()); + LOG.info("Reassigning {} to {} slots", entry.getKey(), slots.size()); + LOG.info("Reassign executors: {}", execToSlot.keySet()); + + final Map, NodeInfo> oldExecToSlot = entry.getValue().leftValue().get_executor_node_port(); + + long commonExecCount = 0; + Set commonSlots = new HashSet<>(execToSlot.size()); + for (Entry, NodeInfo> execEntry : execToSlot.entrySet()) { + if (execEntry.getValue().equals(oldExecToSlot.get(execEntry.getKey()))) { + commonExecCount++; + commonSlots.add(execEntry.getValue()); + } + } + long commonSlotCount = commonSlots.size(); - if (!reassigned.isEmpty()) { - int count = (new HashSet<>(execToNodePort.values())).size(); - Set> reExecs = reassigned.keySet(); - LOG.info("Reassigning {} to {} slots", topoId, count); - LOG.info("Reassign executors: {}", reExecs); + //Treat reassign as remove and add + numRemovedSlot += new HashSet<>(oldExecToSlot.values()).size() - commonSlotCount; + numRemovedExec += oldExecToSlot.size() - commonExecCount; + numAddedSlot += slots.size() - commonSlotCount; + numAddedExec += execToSlot.size() - commonExecCount; } } + LOG.debug("{} assignments unchanged: {}", difference.entriesInCommon().size(), difference.entriesInCommon().keySet()); } - return ret; + numAddedExecPerScheduling.update(numAddedExec); + numAddedSlotPerScheduling.update(numAddedSlot); + numRemovedExecPerScheduling.update(numRemovedExec); + numRemovedSlotPerScheduling.update(numRemovedSlot); + numNetExecIncreasePerScheduling.update(numAddedExec - numRemovedExec); + numNetSlotIncreasePerScheduling.update(numAddedSlot - numRemovedSlot); + + if (anyChanged) { + LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); + nodeIdToResources.get().forEach((id, node) -> + LOG.info( + "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " + + "CPU: {}, Available CPU: {}, fragmented: {}", + id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), + node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); + } + return anyChanged; } private static List> changedExecutors(Map, NodeInfo> map, Map, @@ -780,7 +848,7 @@ private static List> changedExecutors(Map, NodeInfo> map, key.add(ni.get_node()); key.add(ni.get_port_iterator().next()); List> value = new ArrayList<>(entry.getValue()); - value.sort((a, b) -> a.get(0).compareTo(b.get(0))); + value.sort(Comparator.comparing(a -> a.get(0))); slotAssigned.put(key, value); } HashMap, List>> tmpNewSlotAssigned = newExecToNodePort == null ? new HashMap<>() : @@ -788,7 +856,7 @@ private static List> changedExecutors(Map, NodeInfo> map, HashMap, List>> newSlotAssigned = new HashMap<>(); for (Entry, List>> entry : tmpNewSlotAssigned.entrySet()) { List> value = new ArrayList<>(entry.getValue()); - value.sort((a, b) -> a.get(0).compareTo(b.get(0))); + value.sort(Comparator.comparing(a -> a.get(0))); newSlotAssigned.put(entry.getKey(), value); } Map, List>> diff = mapDiff(slotAssigned, newSlotAssigned); @@ -1217,7 +1285,7 @@ private static Map assignmentChangedNodes(Assignment oldAss, Ass return allNodeHost; } else { // rebalance - Map ret = new HashMap(); + Map ret = new HashMap<>(); for (Map.Entry, NodeInfo> entry : newExecutorNodePort.entrySet()) { NodeInfo newNodeInfo = entry.getValue(); NodeInfo oldNodeInfo = oldExecutorNodePort.get(entry.getKey()); @@ -1984,11 +2052,14 @@ private Map computeNewSchedulerAssignments(Map newSchedulerAssignments = null; synchronized (schedLock) { - newSchedulerAssignments = computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); + Map newSchedulerAssignments = + computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId); Map, List>> topologyToExecutorToNodePort = - computeNewTopoToExecToNodePort(newSchedulerAssignments, existingAssignments); - for (String id : assignedTopologyIds) { - if (!topologyToExecutorToNodePort.containsKey(id)) { - topologyToExecutorToNodePort.put(id, null); - } - } + computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds); Map> newAssignedWorkerToResources = computeTopoToNodePortToResources(newSchedulerAssignments); int nowSecs = Time.currentTimeSecs(); @@ -2154,14 +2220,12 @@ private void mkAssignments(String scratchTopoId) throws Exception { if (execToNodePort == null) { execToNodePort = new HashMap<>(); } - Assignment existingAssignment = existingAssignments.get(topoId); Set allNodes = new HashSet<>(); - if (execToNodePort != null) { - for (List nodePort : execToNodePort.values()) { - allNodes.add((String) nodePort.get(0)); - } + for (List nodePort : execToNodePort.values()) { + allNodes.add((String) nodePort.get(0)); } Map allNodeHost = new HashMap<>(); + Assignment existingAssignment = existingAssignments.get(topoId); if (existingAssignment != null) { allNodeHost.putAll(existingAssignment.get_node_host()); } @@ -2219,15 +2283,9 @@ private void mkAssignments(String scratchTopoId) throws Exception { newAssignments.put(topoId, newAssignment); } - if (!newAssignments.equals(existingAssignments)) { + boolean assignmentChanged = auditAssignmentChanges(existingAssignments, newAssignments); + if (assignmentChanged) { LOG.debug("RESETTING id->resources and id->worker-resources cache!"); - LOG.info("Fragmentation after scheduling is: {} MB, {} PCore CPUs", fragmentedMemory(), fragmentedCpu()); - nodeIdToResources.get().forEach((id, node) -> - LOG.info( - "Node Id: {} Total Mem: {}, Used Mem: {}, Available Mem: {}, Total CPU: {}, Used " - + "CPU: {}, Available CPU: {}, fragmented: {}", - id, node.getTotalMem(), node.getUsedMem(), node.getAvailableMem(), - node.getTotalCpu(), node.getUsedCpu(), node.getAvailableCpu(), isFragmented(node))); idToResources.set(new HashMap<>()); idToWorkerResources.set(new HashMap<>()); } @@ -2826,21 +2884,27 @@ public void launchServer() throws Exception { .parallelStream() .mapToDouble(SupervisorResources::getTotalCpu) .sum()); - + StormMetricsRegistry.registerGauge("nimbus:longest-scheduling-time-ms", () -> { + //We want to update longest scheduling time in real time in case scheduler get stuck + // Get current time before startTime to avoid potential race with scheduler's Timer + Long currTime = Time.nanoTime(); + Long startTime = schedulingStartTimeNs.get(); + return TimeUnit.NANOSECONDS.toMillis(startTime == null ? + longestSchedulingTime.get() : Math.max(currTime - startTime, longestSchedulingTime.get())); + }); + StormMetricsRegistry.registerMeter("nimbus:num-launched").mark(); StormMetricsRegistry.startMetricsReporters(conf); - if (clusterConsumerExceutors != null) { - timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), - () -> { - try { - if (isLeader()) { - sendClusterMetricsToExecutors(); - } - } catch (Exception e) { - throw new RuntimeException(e); + timer.scheduleRecurring(0, ObjectReader.getInt(conf.get(DaemonConfig.STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS)), + () -> { + try { + if (isLeader()) { + sendClusterMetricsToExecutors(); } - }); - } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } catch (Exception e) { if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) { throw e; @@ -3689,7 +3753,7 @@ public String beginFileUpload() throws AuthorizationException, TException { beginFileUploadCalls.mark(); checkAuthorization(null, null, "fileUpload"); String fileloc = getInbox() + "/stormjar-" + Utils.uuid() + ".jar"; - uploaders.put(fileloc, Channels.newChannel(new FileOutputStream(fileloc))); + uploaders.put(fileloc, new TimedWritableByteChannel(Channels.newChannel(new FileOutputStream(fileloc)), fileUploadDuration)); LOG.info("Uploading file from client to {}", fileloc); return fileloc; } catch (Exception e) { diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java index 90d68dc1ad4..4619aeb8827 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -114,20 +114,17 @@ public static Collection supervisorWorkerIds(Map conf) { * @param conf * @return * - * @throws Exception */ - public static Map readWorkerHeartbeats(Map conf) throws Exception { + public static Map readWorkerHeartbeats(Map conf) { return _instance.readWorkerHeartbeatsImpl(conf); } /** - * get worker heartbeat by workerId + * get worker heartbeat by workerId. * * @param conf * @param workerId * @return - * - * @throws IOException */ private static LSWorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) { return _instance.readWorkerHeartbeatImpl(conf, workerId); @@ -137,7 +134,7 @@ public static boolean isWorkerHbTimedOut(int now, LSWorkerHeartbeat whb, Map readWorkerHeartbeatsImpl(Map conf) throws Exception { + public Map readWorkerHeartbeatsImpl(Map conf) { Map workerHeartbeats = new HashMap<>(); Collection workerIds = SupervisorUtils.supervisorWorkerIds(conf); diff --git a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java index 952d8d9297b..f12713bc60e 100644 --- a/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java +++ b/storm-server/src/main/java/org/apache/storm/localizer/LocallyCachedBlob.java @@ -52,8 +52,7 @@ public abstract class LocallyCachedBlob { private long lastUsed = Time.currentTimeMillis(); private CompletableFuture doneUpdating = null; - private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram( - "supervisor:blob-fetching-rate-MB/s", new ExponentiallyDecayingReservoir()); + private static final Histogram fetchingRate = StormMetricsRegistry.registerHistogram("supervisor:blob-fetching-rate-MB/s"); /** * Create a new LocallyCachedBlob. diff --git a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java index 602f53e326b..ea8867eda99 100644 --- a/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java +++ b/storm-server/src/main/java/org/apache/storm/metric/StormMetricsRegistry.java @@ -18,18 +18,21 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; import com.codahale.metrics.Reservoir; import com.codahale.metrics.Timer; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StormMetricsRegistry extends MetricRegistry { - private static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry(); + @VisibleForTesting + static final StormMetricsRegistry REGISTRY = new StormMetricsRegistry(); private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class); private StormMetricsRegistry() {/*Singleton pattern*/} @@ -54,6 +57,25 @@ public static void registerMeter(String name, Meter meter) { REGISTRY.register(name, meter); } + public static void registerMetricSet(MetricSet metrics) { + REGISTRY.registerAll(metrics); + } + + public static void unregisterMetricSet(MetricSet metrics) { + unregisterMetricSet(null, metrics); + } + + public static void unregisterMetricSet(String prefix, MetricSet metrics) { + for (Map.Entry entry : metrics.getMetrics().entrySet()) { + final String name = name(prefix, entry.getKey()); + if (entry.getValue() instanceof MetricSet) { + unregisterMetricSet(name, (MetricSet) entry.getValue()); + } else { + REGISTRY.remove(name); + } + } + } + public static Timer registerTimer(String name) { return REGISTRY.register(name, new Timer()); } @@ -84,6 +106,7 @@ public static void startMetricsReporters(Map topoConf) { */ @Override public T register(final String name, T metric) throws IllegalArgumentException { + assert !(metric instanceof MetricSet); try { return super.register(name, metric); } catch (IllegalArgumentException e) { diff --git a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java index e54509e1e00..3783fdbad4d 100644 --- a/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java +++ b/storm-server/src/main/java/org/apache/storm/nimbus/LeaderListenerCallback.java @@ -19,6 +19,8 @@ import java.util.Set; import java.util.TreeSet; import javax.security.auth.Subject; + +import com.codahale.metrics.Meter; import org.apache.commons.io.IOUtils; import org.apache.storm.Config; import org.apache.storm.blobstore.BlobStore; @@ -29,6 +31,7 @@ import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.KeyNotFoundException; import org.apache.storm.generated.StormTopology; +import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.security.auth.ReqContext; import org.apache.storm.shade.com.google.common.base.Joiner; import org.apache.storm.shade.com.google.common.collect.Sets; @@ -45,6 +48,8 @@ * A callback function when nimbus gains leadership. */ public class LeaderListenerCallback { + private static final Meter numGainedLeader = StormMetricsRegistry.registerMeter("nimbus:num-gained-leadership"); + private static final Meter numLostLeader = StormMetricsRegistry.registerMeter("nimbus:num-lost-leadership"); private static final Logger LOG = LoggerFactory.getLogger(LeaderListenerCallback.class); private static final String STORM_JAR_SUFFIX = "-stormjar.jar"; private static final String STORM_CODE_SUFFIX = "-stormcode.ser"; @@ -82,6 +87,7 @@ public LeaderListenerCallback(Map conf, CuratorFramework zk, LeaderLatch leaderL * Invoke when gains leadership. */ public void leaderCallBack() { + numGainedLeader.mark(); //set up nimbus-info to zk setUpNimbusInfo(acls); //sync zk assignments/id-info to local @@ -131,6 +137,7 @@ public void leaderCallBack() { * Invoke when lost leadership. */ public void notLeaderCallback() { + numLostLeader.mark(); tc.clear(); } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java index 3f48669d505..d0142368543 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java @@ -145,11 +145,7 @@ private Cluster( String nodeId = entry.getKey(); SupervisorDetails supervisor = entry.getValue(); String host = supervisor.getHost(); - List ids = hostToId.get(host); - if (ids == null) { - ids = new ArrayList<>(); - hostToId.put(host, ids); - } + List ids = hostToId.computeIfAbsent(host, k -> new ArrayList<>()); ids.add(nodeId); } this.conf = conf; @@ -173,11 +169,7 @@ private Cluster( for (Map.Entry entry : resolvedSuperVisors.entrySet()) { String hostName = entry.getKey(); String rack = entry.getValue(); - List nodesForRack = this.networkTopography.get(rack); - if (nodesForRack == null) { - nodesForRack = new ArrayList<>(); - this.networkTopography.put(rack, nodesForRack); - } + List nodesForRack = this.networkTopography.computeIfAbsent(rack, k -> new ArrayList<>()); nodesForRack.add(hostName); } } else { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java index 855cc968548..18de717c751 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/ExecutorDetails.java @@ -18,6 +18,9 @@ package org.apache.storm.scheduler; +import java.util.Arrays; +import java.util.List; + public class ExecutorDetails { public final int startTask; public final int endTask; @@ -35,9 +38,13 @@ public int getEndTask() { return endTask; } + public List toList() { + return Arrays.asList((long) startTask, (long) endTask); + } + @Override public boolean equals(Object other) { - if (other == null || !(other instanceof ExecutorDetails)) { + if (!(other instanceof ExecutorDetails)) { return false; } diff --git a/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java new file mode 100644 index 00000000000..5d9b3e4525e --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/metric/StormMetricsRegistryTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.metric; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; +import com.codahale.metrics.Timer; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; + +import static com.codahale.metrics.MetricRegistry.name; +import static org.junit.jupiter.api.Assertions.*; + +class StormMetricsRegistryTest { + private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistryTest.class); + + private static final String OUTER_METER = "outerMeter"; + private static final String INNER_SET = "innerSet"; + private static final String OUTER_TIMER = "outerTimer"; + private static final String INNER_METER = "innerMeter"; + private static final String INNER_TIMER = "innerTimer"; + private static final MetricSet OUTER = newMetricSetInstance(); + + @Test + void registerMetricSet() { + Meter existingInnerMeter = StormMetricsRegistry.registerMeter(name(INNER_SET, INNER_METER)); + + LOG.info("register outer set"); + StormMetricsRegistry.registerMetricSet(OUTER); + assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER)); + assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER)); + assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER), + StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER))); + + assertNotSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_METER), + StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER))); + assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER))); + + //Ensure idempotency + LOG.info("twice register outer set"); + MetricSet newOuter = newMetricSetInstance(); + StormMetricsRegistry.registerMetricSet(newOuter); + assertSame(OUTER.getMetrics().get(OUTER_TIMER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_TIMER)); + assertSame(OUTER.getMetrics().get(OUTER_METER), StormMetricsRegistry.REGISTRY.getMetrics().get(OUTER_METER)); + assertSame(((MetricSet) OUTER.getMetrics().get(INNER_SET)).getMetrics().get(INNER_TIMER), + StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_TIMER))); + assertSame(existingInnerMeter, StormMetricsRegistry.REGISTRY.getMetrics().get(name(INNER_SET, INNER_METER))); + + LOG.info("name collision"); + assertThrows(IllegalArgumentException.class, () -> StormMetricsRegistry.registerGauge(name(INNER_SET, INNER_METER), () -> 0)); + } + + @Test + void unregisterMetricSet() { + StormMetricsRegistry.registerMetricSet(OUTER); + StormMetricsRegistry.unregisterMetricSet(OUTER); + assertTrue(StormMetricsRegistry.REGISTRY.getMetrics().isEmpty()); + + } + + private static MetricSet newMetricSetInstance() { + return new MetricSet() { + private final MetricSet inner = new MetricSet() { + private final Map map = new HashMap<>(); + + { + map.put(INNER_METER, new Meter()); + map.put(INNER_TIMER, new Timer()); + } + + @Override + public Map getMetrics() { + return map; + } + }; + private final Map outerMap = new HashMap<>(); + + { + outerMap.put(OUTER_METER, new Meter()); + outerMap.put(INNER_SET, inner); + outerMap.put(OUTER_TIMER, new Timer()); + } + + @Override + public Map getMetrics() { + return outerMap; + } + }; + } +} \ No newline at end of file diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java index 07ac14ba26f..07b971c0be6 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java @@ -21,6 +21,8 @@ import static org.apache.storm.DaemonConfig.UI_HEADER_BUFFER_BYTES; import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; import com.google.common.annotations.VisibleForTesting; import java.io.File; @@ -31,6 +33,7 @@ import org.apache.storm.DaemonConfig; import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.utils.ExceptionMeters; import org.apache.storm.daemon.logviewer.utils.LogCleaner; import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import org.apache.storm.daemon.logviewer.webapp.LogviewerApplication; @@ -126,6 +129,7 @@ public LogviewerServer(Map conf) { void start() throws Exception { LOG.info("Starting Logviewer..."); if (httpServer != null) { + StormMetricsRegistry.registerMetricSet(ExceptionMeters::getMetrics); httpServer.start(); } } @@ -165,7 +169,7 @@ public static void main(String [] args) throws Exception { try (LogviewerServer server = new LogviewerServer(conf); LogCleaner logCleaner = new LogCleaner(conf, workerLogs, directoryCleaner, logRootDir)) { - Utils.addShutdownHookWithForceKillIn1Sec(() -> server.close()); + Utils.addShutdownHookWithForceKillIn1Sec(server::close); logCleaner.start(); StormMetricsRegistry.startMetricsReporters(conf); server.start(); diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java index 32e79eb6ea5..089d965004e 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogPageHandler.java @@ -37,11 +37,13 @@ import static java.util.stream.Collectors.toList; import static org.apache.commons.lang.StringEscapeUtils.escapeHtml; +import com.codahale.metrics.Meter; import j2html.tags.DomContent; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; @@ -61,6 +63,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.storm.daemon.logviewer.LogviewerConstant; import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.utils.ExceptionMeters; import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder; import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; import org.apache.storm.daemon.logviewer.utils.WorkerLogs; @@ -68,11 +71,13 @@ import org.apache.storm.daemon.ui.UIHelpers; import org.apache.storm.daemon.utils.StreamUtil; import org.apache.storm.daemon.utils.UrlBuilder; +import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.ServerUtils; import org.jooq.lambda.Unchecked; public class LogviewerLogPageHandler { + private static final Meter numPageRead = StormMetricsRegistry.registerMeter("logviewer:num-page-read"); private final String logRoot; private final String daemonLogRoot; private final WorkerLogs workerLogs; @@ -152,7 +157,7 @@ public Response listLogFiles(String user, Integer port, String topologyId, Strin List files; if (fileResults != null) { files = fileResults.stream() - .map(file -> WorkerLogs.getTopologyPortWorkerLog(file)) + .map(WorkerLogs::getTopologyPortWorkerLog) .sorted().collect(toList()); } else { files = new ArrayList<>(); @@ -162,11 +167,12 @@ public Response listLogFiles(String user, Integer port, String topologyId, Strin } /** - * Provides a worker log file to view. + * Provides a worker log file to view, starting from the specified position + * or default starting position of the most recent page. * * @param fileName file to view - * @param start start offset, can be null - * @param length length to read in this page, can be null + * @param start start offset, or null if the most recent page is desired + * @param length length to read in this page, or null if default page length is desired * @param grep search string if request is a result of the search, can be null * @param user username * @return HTML view page of worker log @@ -179,7 +185,6 @@ public Response logPage(String fileName, Integer start, Integer length, String g File file = new File(rootDir, fileName).getCanonicalFile(); String path = file.getCanonicalPath(); - boolean isZipFile = path.endsWith(".gz"); File topoDir = file.getParentFile().getParentFile(); if (file.exists() && new File(rootDir).getCanonicalFile().equals(topoDir.getParentFile())) { @@ -193,24 +198,21 @@ public Response logPage(String fileName, Integer start, Integer length, String g throw e.getCause(); } - List filesStrWithoutFileParam = logFiles.stream().map(WorkerLogs::getTopologyPortWorkerLog) - .filter(fileStr -> !StringUtils.equals(fileName, fileStr)).collect(toList()); - - List reorderedFilesStr = new ArrayList<>(); - reorderedFilesStr.addAll(filesStrWithoutFileParam); + List reorderedFilesStr = logFiles.stream() + .map(WorkerLogs::getTopologyPortWorkerLog) + .filter(fileStr -> !StringUtils.equals(fileName, fileStr)) + .collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - - String logString; - if (isTxtFile(fileName)) { - logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); - } else { - logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + final boolean isZipFile = path.endsWith(".gz"); + long fileLength = getFileLength(file, isZipFile); + if (start == null) { + start = Long.valueOf(fileLength - length).intValue(); } - long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); - start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) : + escapeHtml("This is a binary file and cannot display! You may download the full file."); List bodyContents = new ArrayList<>(); if (StringUtils.isNotEmpty(grep)) { @@ -254,8 +256,8 @@ public Response logPage(String fileName, Integer start, Integer length, String g * Provides a daemon log file to view. * * @param fileName file to view - * @param start start offset, can be null - * @param length length to read in this page, can be null + * @param start start offset, or null if the most recent page is desired + * @param length length to read in this page, or null if default page length is desired * @param grep search string if request is a result of the search, can be null * @param user username * @return HTML view page of daemon log @@ -265,7 +267,6 @@ public Response daemonLogPage(String fileName, Integer start, Integer length, St String rootDir = daemonLogRoot; File file = new File(rootDir, fileName).getCanonicalFile(); String path = file.getCanonicalPath(); - boolean isZipFile = path.endsWith(".gz"); if (file.exists() && new File(rootDir).getCanonicalFile().equals(file.getParentFile())) { // all types of files included @@ -273,24 +274,21 @@ public Response daemonLogPage(String fileName, Integer start, Integer length, St .filter(File::isFile) .collect(toList()); - List filesStrWithoutFileParam = logFiles.stream() - .map(File::getName).filter(fName -> !StringUtils.equals(fileName, fName)).collect(toList()); - - List reorderedFilesStr = new ArrayList<>(); - reorderedFilesStr.addAll(filesStrWithoutFileParam); + List reorderedFilesStr = logFiles.stream() + .map(File::getName) + .filter(fName -> !StringUtils.equals(fileName, fName)) + .collect(toList()); reorderedFilesStr.add(fileName); length = length != null ? Math.min(10485760, length) : LogviewerConstant.DEFAULT_BYTES_PER_PAGE; - - String logString; - if (isTxtFile(fileName)) { - logString = escapeHtml(start != null ? pageFile(path, start, length) : pageFile(path, length)); - } else { - logString = escapeHtml("This is a binary file and cannot display! You may download the full file."); + final boolean isZipFile = path.endsWith(".gz"); + long fileLength = getFileLength(file, isZipFile); + if (start == null) { + start = Long.valueOf(fileLength - length).intValue(); } - long fileLength = isZipFile ? ServerUtils.zipFileSize(file) : file.length(); - start = start != null ? start : Long.valueOf(fileLength - length).intValue(); + String logString = isTxtFile(fileName) ? escapeHtml(pageFile(path, isZipFile, fileLength, start, length)) : + escapeHtml("This is a binary file and cannot display! You may download the full file."); List bodyContents = new ArrayList<>(); if (StringUtils.isNotEmpty(grep)) { @@ -323,6 +321,18 @@ public Response daemonLogPage(String fileName, Integer start, Integer length, St } } + private long getFileLength(File file, boolean isZipFile) throws IOException { + try { + return isZipFile ? ServerUtils.zipFileSize(file) : file.length(); + } catch (FileNotFoundException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; + } catch (IOException e) { + ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark(); + throw e; + } + } + private DomContent logTemplate(List bodyContents, String fileName, String user) { List finalBodyContents = new ArrayList<>(); @@ -426,17 +436,8 @@ private DomContent toButtonLink(String url, String text, boolean enabled) { return a(text).withHref(url).withClass("btn btn-default " + (enabled ? "enabled" : "disabled")); } - private String pageFile(String path, Integer tail) throws IOException, InvalidRequestException { - boolean isZipFile = path.endsWith(".gz"); - long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); - long skip = fileLength - tail; - return pageFile(path, Long.valueOf(skip).intValue(), tail); - } - - private String pageFile(String path, Integer start, Integer length) throws IOException, InvalidRequestException { - boolean isZipFile = path.endsWith(".gz"); - long fileLength = isZipFile ? ServerUtils.zipFileSize(new File(path)) : new File(path).length(); - + private String pageFile(String path, boolean isZipFile, long fileLength, Integer start, Integer readLength) + throws IOException, InvalidRequestException { try (InputStream input = isZipFile ? new GZIPInputStream(new FileInputStream(path)) : new FileInputStream(path); ByteArrayOutputStream output = new ByteArrayOutputStream()) { if (start >= fileLength) { @@ -447,8 +448,8 @@ private String pageFile(String path, Integer start, Integer length) throws IOExc } byte[] buffer = new byte[1024]; - while (output.size() < length) { - int size = input.read(buffer, 0, Math.min(1024, length - output.size())); + while (output.size() < readLength) { + int size = input.read(buffer, 0, Math.min(1024, readLength - output.size())); if (size > 0) { output.write(buffer, 0, size); } else { @@ -456,7 +457,14 @@ private String pageFile(String path, Integer start, Integer length) throws IOExc } } + numPageRead.mark(); return output.toString(); + } catch (FileNotFoundException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; + } catch (IOException e) { + ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark(); + throw e; } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java index a26396c1554..bcde077cfd8 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandler.java @@ -25,6 +25,10 @@ import static org.apache.storm.daemon.utils.ListFunctionalSupport.rest; import static org.apache.storm.daemon.utils.PathUtil.truncatePathToLastElements; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; @@ -46,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Stream; import java.util.zip.GZIPInputStream; import javax.ws.rs.core.Response; @@ -56,12 +61,14 @@ import org.apache.storm.daemon.common.JsonResponseBuilder; import org.apache.storm.daemon.logviewer.LogviewerConstant; import org.apache.storm.daemon.logviewer.utils.DirectoryCleaner; +import org.apache.storm.daemon.logviewer.utils.ExceptionMeters; import org.apache.storm.daemon.logviewer.utils.LogviewerResponseBuilder; import org.apache.storm.daemon.logviewer.utils.ResourceAuthorizer; import org.apache.storm.daemon.logviewer.utils.WorkerLogs; import org.apache.storm.daemon.ui.InvalidRequestException; import org.apache.storm.daemon.utils.StreamUtil; import org.apache.storm.daemon.utils.UrlBuilder; +import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.ServerUtils; import org.apache.storm.utils.Utils; @@ -71,6 +78,9 @@ public class LogviewerLogSearchHandler { private static final Logger LOG = LoggerFactory.getLogger(LogviewerLogSearchHandler.class); + private static final Meter numDeepSearchNoResult = StormMetricsRegistry.registerMeter("logviewer:num-deep-search-no-result"); + private static final Histogram numFileScanned = StormMetricsRegistry.registerHistogram("logviewer:num-files-scanned-per-deep-search"); + private static final Meter numSearchRequestNoResult = StormMetricsRegistry.registerMeter("logviewer:num-search-request-no-result"); public static final int GREP_MAX_SEARCH_SIZE = 1024; public static final int GREP_BUF_SIZE = 2048; @@ -124,6 +134,8 @@ public LogviewerLogSearchHandler(Map stormConf, String logRoot, public Response searchLogFile(String fileName, String user, boolean isDaemon, String search, String numMatchesStr, String offsetStr, String callback, String origin) throws IOException, InvalidRequestException { + boolean noResult = true; + String rootDir = isDaemon ? daemonLogRoot : logRoot; File file = new File(rootDir, fileName).getCanonicalFile(); Response response; @@ -136,7 +148,9 @@ public Response searchLogFile(String fileName, String user, boolean isDaemon, St if (StringUtils.isNotEmpty(search) && search.getBytes("UTF-8").length <= GREP_MAX_SEARCH_SIZE) { Map entity = new HashMap<>(); entity.put("isDaemon", isDaemon ? "yes" : "no"); - entity.putAll(substringSearch(file, search, isDaemon, numMatchesInt, offsetInt)); + Map res = substringSearch(file, search, isDaemon, numMatchesInt, offsetInt); + entity.putAll(res); + noResult = ((List) res.get("matches")).isEmpty(); response = LogviewerResponseBuilder.buildSuccessJsonResponse(entity, callback, origin); } else { @@ -159,16 +173,20 @@ public Response searchLogFile(String fileName, String user, boolean isDaemon, St response = new JsonResponseBuilder().setData(entity).setCallback(callback).setStatus(404).build(); } + if (noResult) { + numSearchRequestNoResult.mark(); + } return response; } /** - * Deep search across worker log files in a topology. + * Advanced search across worker log files in a topology. * * @param topologyId topology ID * @param user username * @param search search string - * @param numMatchesStr the count of maximum matches + * @param numMatchesStr the count of maximum matches. Note that this number is with respect to + * each port, not to each log or each search request * @param portStr worker port, null or '*' if the request wants to search from all worker logs * @param fileOffsetStr index (offset) of the log files * @param offsetStr start offset for log file @@ -180,6 +198,9 @@ public Response searchLogFile(String fileName, String user, boolean isDaemon, St public Response deepSearchLogsForTopology(String topologyId, String user, String search, String numMatchesStr, String portStr, String fileOffsetStr, String offsetStr, Boolean searchArchived, String callback, String origin) { + int numMatchedFiles = 0; + int numScannedFiles = 0; + String rootDir = logRoot; Object returnValue; File topologyDir = new File(rootDir, topologyId); @@ -200,24 +221,24 @@ public Response deepSearchLogsForTopology(String topologyId, String user, String if (StringUtils.isEmpty(portStr) || portStr.equals("*")) { // check for all ports - List> filteredLogs = portDirs.stream() - .map(portDir -> logsForPort(user, portDir)) - .filter(logs -> logs != null && !logs.isEmpty()) - .collect(toList()); + Stream> portsOfLogs = portDirs.stream() + .map(portDir -> logsForPort(user, portDir)) + .filter(logs -> logs != null && !logs.isEmpty()); - if (BooleanUtils.isTrue(searchArchived)) { - returnValue = filteredLogs.stream() - .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) - .collect(toList()); - } else { - returnValue = filteredLogs.stream() - .map(fl -> Collections.singletonList(first(fl))) - .map(fl -> findNMatches(fl, numMatches, 0, 0, search)) - .collect(toList()); + if (BooleanUtils.isNotTrue(searchArchived)) { + portsOfLogs = portsOfLogs.map(fl -> Collections.singletonList(first(fl))); } + + final List matchedList = portsOfLogs + .map(logs -> findNMatches(logs, numMatches, 0, 0, search)) + .collect(toList()); + numMatchedFiles = matchedList.stream().mapToInt(match -> match.getMatches().size()).sum(); + numScannedFiles = matchedList.stream().mapToInt(match -> match.openedFiles).sum(); + returnValue = matchedList; } else { int port = Integer.parseInt(portStr); // check just the one port + @SuppressWarnings("unchecked") List slotsPorts = (List) stormConf.getOrDefault(DaemonConfig.SUPERVISOR_SLOTS_PORTS, new ArrayList<>()); boolean containsPort = slotsPorts.stream() @@ -232,17 +253,22 @@ public Response deepSearchLogsForTopology(String topologyId, String user, String returnValue = new ArrayList<>(); } else { List filteredLogs = logsForPort(user, portDir); - if (BooleanUtils.isTrue(searchArchived)) { - returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search); - } else { - returnValue = findNMatches(Collections.singletonList(first(filteredLogs)), - numMatches, 0, offset, search); + if (BooleanUtils.isNotTrue(searchArchived)) { + filteredLogs = Collections.singletonList(first(filteredLogs)); + fileOffset = 0; } + returnValue = findNMatches(filteredLogs, numMatches, fileOffset, offset, search); + numMatchedFiles = ((Matched) returnValue).getMatches().size(); + numScannedFiles = ((Matched) returnValue).openedFiles; } } } } + if (numMatchedFiles == 0) { + numDeepSearchNoResult.mark(); + } + numFileScanned.update(numScannedFiles); return LogviewerResponseBuilder.buildSuccessJsonResponse(returnValue, callback, origin); } @@ -271,26 +297,21 @@ Map substringSearch(File file, String searchString, int numMatche private Map substringSearch(File file, String searchString, boolean isDaemon, Integer numMatches, Integer startByteOffset) throws InvalidRequestException { - try { - if (StringUtils.isEmpty(searchString)) { - throw new IllegalArgumentException("Precondition fails: search string should not be empty."); - } - if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) { - throw new IllegalArgumentException("Precondition fails: the length of search string should be less than " - + GREP_MAX_SEARCH_SIZE); - } + if (StringUtils.isEmpty(searchString)) { + throw new IllegalArgumentException("Precondition fails: search string should not be empty."); + } + if (searchString.getBytes(StandardCharsets.UTF_8).length > GREP_MAX_SEARCH_SIZE) { + throw new IllegalArgumentException("Precondition fails: the length of search string should be less than " + + GREP_MAX_SEARCH_SIZE); + } - boolean isZipFile = file.getName().endsWith(".gz"); - try (InputStream fis = Files.newInputStream(file.toPath()); - InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis; - BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) { + boolean isZipFile = file.getName().endsWith(".gz"); + try (InputStream fis = Files.newInputStream(file.toPath())) { + try (InputStream gzippedInputStream = isZipFile ? new GZIPInputStream(fis) : fis; + BufferedInputStream stream = new BufferedInputStream(gzippedInputStream)) { - int fileLength; - if (isZipFile) { - fileLength = (int) ServerUtils.zipFileSize(file); - } else { - fileLength = (int) file.length(); - } + //It's more likely to be a file read exception here, so we don't differentiate + int fileLength = isZipFile ? (int) ServerUtils.zipFileSize(file) : (int) file.length(); ByteBuffer buf = ByteBuffer.allocate(GREP_BUF_SIZE); final byte[] bufArray = buf.array(); @@ -311,7 +332,7 @@ private Map substringSearch(File file, String searchString, boole Arrays.fill(bufArray, (byte) 0); int totalBytesRead = 0; - int bytesRead = stream.read(bufArray, 0, Math.min((int) fileLength, GREP_BUF_SIZE)); + int bytesRead = stream.read(bufArray, 0, Math.min(fileLength, GREP_BUF_SIZE)); buf.limit(bytesRead); totalBytesRead += bytesRead; @@ -335,7 +356,7 @@ private Map substringSearch(File file, String searchString, boole // buffer on the previous read. final int newBufOffset = Math.min(buf.limit(), GREP_MAX_SEARCH_SIZE) - searchBytes.length; - totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, file, fileLength); + totalBytesRead = rotateGrepBuffer(buf, stream, totalBytesRead, fileLength); if (totalBytesRead < 0) { throw new InvalidRequestException("Cannot search past the end of the file"); } @@ -358,8 +379,14 @@ private Map substringSearch(File file, String searchString, boole } } return ret; + } catch (UnknownHostException | UnsupportedEncodingException e) { + throw new RuntimeException(e); + } catch (IOException e) { + ExceptionMeters.NUM_FILE_READ_EXCEPTIONS.mark(); + throw new RuntimeException(e); } } catch (IOException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); throw new RuntimeException(e); } } @@ -388,32 +415,46 @@ List logsForPort(String user, File portDir) { } } + /** + * Find the first N matches of target string in files. + * @param logs all candidate log files to search + * @param numMatches number of matches expected + * @param fileOffset number of log files to skip initially + * @param startByteOffset number of byte to be ignored in each log file + * @param targetStr searched string + * @return all matched results + */ @VisibleForTesting - Matched findNMatches(List logs, int numMatches, int fileOffset, int offset, String search) { + Matched findNMatches(List logs, int numMatches, int fileOffset, int startByteOffset, String targetStr) { logs = drop(logs, fileOffset); + LOG.debug("{} files to scan", logs.size()); List> matches = new ArrayList<>(); int matchCount = 0; + int scannedFiles = 0; while (true) { if (logs.isEmpty()) { + //fileOffset = one past last scanned file break; } File firstLog = logs.get(0); - Map theseMatches; + Map matchInLog; try { LOG.debug("Looking through {}", firstLog); - theseMatches = substringSearch(firstLog, search, numMatches - matchCount, offset); + matchInLog = substringSearch(firstLog, targetStr, numMatches - matchCount, startByteOffset); + scannedFiles++; } catch (InvalidRequestException e) { LOG.error("Can't search past end of file.", e); - theseMatches = new HashMap<>(); + matchInLog = new HashMap<>(); } String fileName = WorkerLogs.getTopologyPortWorkerLog(firstLog); + //This section simply put the formatted log filename and corresponding port in the matching. final List> newMatches = new ArrayList<>(matches); - Map currentFileMatch = new HashMap<>(theseMatches); + Map currentFileMatch = new HashMap<>(matchInLog); currentFileMatch.put("fileName", fileName); Path firstLogAbsPath; try { @@ -424,27 +465,27 @@ Matched findNMatches(List logs, int numMatches, int fileOffset, int offset currentFileMatch.put("port", truncatePathToLastElements(firstLogAbsPath, 2).getName(0).toString()); newMatches.add(currentFileMatch); - int newCount = matchCount + ((List)theseMatches.get("matches")).size(); - - //theseMatches is never empty! As guaranteed by the #get().size() method above + int newCount = matchCount + ((List)matchInLog.get("matches")).size(); if (newCount == matchCount) { // matches and matchCount is not changed logs = rest(logs); - offset = 0; + startByteOffset = 0; fileOffset = fileOffset + 1; } else if (newCount >= numMatches) { matches = newMatches; + //fileOffset = the index of last scanned file break; } else { matches = newMatches; logs = rest(logs); - offset = 0; + startByteOffset = 0; fileOffset = fileOffset + 1; matchCount = newCount; } } - return new Matched(fileOffset, search, matches); + LOG.debug("scanned {} files", scannedFiles); + return new Matched(fileOffset, targetStr, matches, scannedFiles); } @@ -502,8 +543,7 @@ private SubstringSearchResult bufferSubstringSearch(boolean isDaemon, File file, return new SubstringSearchResult(matches, newByteOffset, newBeforeBytes); } - private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, File file, - int fileLength) throws IOException { + private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int totalBytesRead, int fileLength) throws IOException { byte[] bufArray = buf.array(); // Copy the 2nd half of the buffer to the first half. @@ -513,7 +553,7 @@ private int rotateGrepBuffer(ByteBuffer buf, BufferedInputStream stream, int tot Arrays.fill(bufArray, GREP_MAX_SEARCH_SIZE, bufArray.length, (byte) 0); // Fill the 2nd half with new bytes from the stream. - int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min((int) fileLength, GREP_MAX_SEARCH_SIZE)); + int bytesRead = stream.read(bufArray, GREP_MAX_SEARCH_SIZE, Math.min(fileLength, GREP_MAX_SEARCH_SIZE)); buf.limit(GREP_MAX_SEARCH_SIZE + bytesRead); return totalBytesRead + bytesRead; } @@ -693,18 +733,21 @@ public static class Matched implements JSONAware { private int fileOffset; private String searchString; private List> matches; + @JsonIgnore + private final int openedFiles; /** * Constructor. - * - * @param fileOffset offset (index) of the files + * @param fileOffset offset (index) of the files * @param searchString search string * @param matches map representing matched search result + * @param openedFiles number of files scanned, used for metrics only */ - public Matched(int fileOffset, String searchString, List> matches) { + public Matched(int fileOffset, String searchString, List> matches, int openedFiles) { this.fileOffset = fileOffset; this.searchString = searchString; this.matches = matches; + this.openedFiles = openedFiles; } public int getFileOffset() { diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java new file mode 100644 index 00000000000..9e0afd97e96 --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DeletionMeta.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.utils; + +class DeletionMeta { + static final DeletionMeta EMPTY = new DeletionMeta(0, 0); + + final long deletedSize; + final int deletedFiles; + + DeletionMeta(long deletedSize, int deletedFiles) { + this.deletedSize = deletedSize; + this.deletedFiles = deletedFiles; + } +} diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java index 310bc8e184c..293b2bee699 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/DirectoryCleaner.java @@ -60,7 +60,12 @@ public class DirectoryCleaner { * @return DirectoryStream */ public DirectoryStream getStreamForDirectory(File dir) throws IOException { - return Files.newDirectoryStream(dir.toPath()); + try { + return Files.newDirectoryStream(dir.toPath()); + } catch (IOException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; + } } /** @@ -74,11 +79,9 @@ public DirectoryStream getStreamForDirectory(File dir) throws IOException * @param activeDirs only for global deletion, we want to skip the active logs in activeDirs * @return number of files deleted */ - public int deleteOldestWhileTooLarge(List dirs, - long quota, boolean forPerDir, Set activeDirs) throws IOException { + public DeletionMeta deleteOldestWhileTooLarge(List dirs, + long quota, boolean forPerDir, Set activeDirs) throws IOException { long totalSize = 0; - int deletedFiles = 0; - for (File dir : dirs) { try (DirectoryStream stream = getStreamForDirectory(dir)) { for (Path path : stream) { @@ -87,13 +90,14 @@ public int deleteOldestWhileTooLarge(List dirs, } } } - LOG.debug("totalSize: {} quota: {}", totalSize, quota); long toDeleteSize = totalSize - quota; if (toDeleteSize <= 0) { - return deletedFiles; + return DeletionMeta.EMPTY; } + int deletedFiles = 0; + long deletedSize = 0; // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root PriorityQueue pq = new PriorityQueue<>(PQ_SIZE, (f1, f2) -> f1.lastModified() > f2.lastModified() ? -1 : 1); int round = 0; @@ -134,6 +138,7 @@ public int deleteOldestWhileTooLarge(List dirs, Utils.forceDelete(file.getPath()); LOG.info("Delete file: {}, size: {}, lastModified: {}", canonicalPath, fileSize, lastModified); toDeleteSize -= fileSize; + deletedSize += fileSize; deletedFiles++; } catch (IOException e) { excluded.add(file); @@ -157,7 +162,7 @@ public int deleteOldestWhileTooLarge(List dirs, forPerDir ? "this directory" : "root directory", toDeleteSize * 1e-6); } } - return deletedFiles; + return new DeletionMeta(deletedSize, deletedFiles); } private boolean isFileEligibleToSkipDelete(boolean forPerDir, Set activeDirs, File dir, File file) throws IOException { @@ -186,7 +191,11 @@ public static List getFilesForDir(File dir) throws IOException { break; } } + } catch (IOException e) { + ExceptionMeters.NUM_FILE_OPEN_EXCEPTIONS.mark(); + throw e; } return files; } + } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java new file mode 100644 index 00000000000..81aa2223f3f --- /dev/null +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/ExceptionMeters.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. + * See the NOTICE file distributed with this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + */ + +package org.apache.storm.daemon.logviewer.utils; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricSet; + +import java.util.HashMap; +import java.util.Map; + +public enum ExceptionMeters { + //Operation level IO Exceptions + NUM_FILE_OPEN_EXCEPTIONS("logviewer:num-file-open-exceptions"), + NUM_FILE_READ_EXCEPTIONS("logviewer:num-file-read-exceptions"), + NUM_FILE_REMOVAL_EXCEPTIONS("logviewer:num-file-removal-exceptions"), + NUM_FILE_DOWNLOAD_EXCEPTIONS("logviewer:num-file-download-exceptions"), + NUM_SET_PERMISSION_EXCEPTIONS("logviewer:num-set-permission-exceptions"), + + //Routine level + NUM_CLEANUP_EXCEPTIONS("logviewer:num-other-cleanup-exceptions"), + NUM_READ_LOG_EXCEPTIONS("logviewer:num-read-log-exceptions"), + NUM_READ_DAEMON_LOG_EXCEPTIONS("logviewer:num-read-daemon-log-exceptions"), + NUM_LIST_LOG_EXCEPTIONS("logviewer:num-search-log-exceptions"), + NUM_LIST_DUMP_EXCEPTIONS("logviewer:num-list-dump-files-exceptions"), + NUM_DOWNLOAD_DUMP_EXCEPTIONS("logviewer:num-download-dump-exceptions"), + NUM_DOWNLOAD_LOG_EXCEPTIONS("logviewer:num-download-log-exceptions"), + NUM_DOWNLOAD_DAEMON_LOG_EXCEPTIONS("logviewer:num-download-daemon-log-exceptions"), + NUM_SEARCH_EXCEPTIONS("logviewer:num-search-exceptions"); + + private static final Map metrics = new HashMap<>(); + + static { + for (ExceptionMeters e : ExceptionMeters.values()) { + metrics.put(e.name, e.meter); + } + } + + private final String name; + private final Meter meter; + + public static Map getMetrics() { + return metrics; + } + + ExceptionMeters(String name) { + this.name = name; + meter = new Meter(); + } + + public void mark() { + this.meter.mark(); + } +} diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java index b8468ec3af6..ae8aff68b4a 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java @@ -27,6 +27,8 @@ import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB; import static org.apache.storm.DaemonConfig.LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import java.io.Closeable; @@ -48,8 +50,10 @@ import java.util.function.BinaryOperator; import java.util.stream.StreamSupport; +import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.storm.StormTimer; +import org.apache.storm.metric.StormMetricsRegistry; import org.apache.storm.utils.ObjectReader; import org.apache.storm.utils.Time; import org.apache.storm.utils.Utils; @@ -62,6 +66,9 @@ */ public class LogCleaner implements Runnable, Closeable { private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); + private static final Timer cleanupRoutineDuration = StormMetricsRegistry.registerTimer("logviewer:cleanup-routine-duration-ms"); + private static final Histogram numFilesCleanedUp = StormMetricsRegistry.registerHistogram("logviewer:num-files-cleaned-up"); + private static final Histogram diskSpaceFreed = StormMetricsRegistry.registerHistogram("logviewer:disk-space-freed-in-bytes"); private final Map stormConf; private final Integer intervalSecs; @@ -95,6 +102,8 @@ public LogCleaner(Map stormConf, WorkerLogs workerLogs, Director LOG.info("configured max total size of worker logs: {} MB, max total size of worker logs per directory: {} MB", maxSumWorkerLogsSizeMb, maxPerWorkerLogsSizeMb); + //Switch to CachedGauge if this starts to hurt performance + StormMetricsRegistry.registerGauge("logviewer:worker-log-dir-size", () -> FileUtils.sizeOf(logRootDir)); } /** @@ -131,7 +140,9 @@ public void close() { */ @Override public void run() { - try { + int numFilesCleaned = 0; + long diskSpaceCleaned = 0L; + try (Timer.Context t = cleanupRoutineDuration.time()) { final long nowMills = Time.currentTimeMillis(); Set oldLogDirs = selectDirsForCleanup(nowMills); @@ -142,30 +153,41 @@ public void run() { oldLogDirs.stream().map(File::getName).collect(joining(",")), deadWorkerDirs.stream().map(File::getName).collect(joining(","))); - deadWorkerDirs.forEach(Unchecked.consumer(dir -> { + for (File dir : deadWorkerDirs) { String path = dir.getCanonicalPath(); - LOG.info("Cleaning up: Removing {}", path); + long sizeInBytes = FileUtils.sizeOf(dir); + LOG.info("Cleaning up: Removing {}, {} KB", path, sizeInBytes * 1e-3); try { Utils.forceDelete(path); cleanupEmptyTopoDirectory(dir); + numFilesCleaned++; + diskSpaceCleaned += sizeInBytes; } catch (Exception ex) { + ExceptionMeters.NUM_FILE_REMOVAL_EXCEPTIONS.mark(); LOG.error(ex.getMessage(), ex); } - })); + } - perWorkerDirCleanup(maxPerWorkerLogsSizeMb * 1024 * 1024); - globalLogCleanup(maxSumWorkerLogsSizeMb * 1024 * 1024); + final List perWorkerDirCleanupMeta = perWorkerDirCleanup(maxPerWorkerLogsSizeMb * 1024 * 1024); + numFilesCleaned += perWorkerDirCleanupMeta.stream().mapToInt(meta -> meta.deletedFiles).sum(); + diskSpaceCleaned += perWorkerDirCleanupMeta.stream().mapToLong(meta -> meta.deletedSize).sum(); + final DeletionMeta globalLogCleanupMeta = globalLogCleanup(maxSumWorkerLogsSizeMb * 1024 * 1024); + numFilesCleaned += globalLogCleanupMeta.deletedFiles; + diskSpaceCleaned += globalLogCleanupMeta.deletedSize; } catch (Exception ex) { + ExceptionMeters.NUM_CLEANUP_EXCEPTIONS.mark(); LOG.error("Exception while cleaning up old log.", ex); } + numFilesCleanedUp.update(numFilesCleaned); + diskSpaceFreed.update(diskSpaceCleaned); } /** * Delete the oldest files in each overloaded worker log dir. */ @VisibleForTesting - List perWorkerDirCleanup(long size) { + List perWorkerDirCleanup(long size) { return workerLogs.getAllWorkerDirs().stream() .map(Unchecked.function(dir -> directoryCleaner.deleteOldestWhileTooLarge(Collections.singletonList(dir), size, true, null))) @@ -176,7 +198,7 @@ List perWorkerDirCleanup(long size) { * Delete the oldest files in overloaded worker-artifacts globally. */ @VisibleForTesting - int globalLogCleanup(long size) throws Exception { + DeletionMeta globalLogCleanup(long size) throws Exception { List workerDirs = new ArrayList<>(workerLogs.getAllWorkerDirs()); Set aliveWorkerDirs = new HashSet<>(workerLogs.getAliveWorkerDirs()); @@ -223,8 +245,8 @@ Set selectDirsForCleanup(long nowMillis) { @VisibleForTesting FileFilter mkFileFilterForLogCleanup(long nowMillis) { - final long cutoffAgeMillis = cleanupCutoffAgeMillis(nowMillis); - return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cutoffAgeMillis; + //Doesn't it make more sense to do file.isDirectory here? + return file -> !file.isFile() && lastModifiedTimeWorkerLogdir(file) <= cleanupCutoffAgeMillis(nowMillis); } /** @@ -235,7 +257,7 @@ FileFilter mkFileFilterForLogCleanup(long nowMillis) { private long lastModifiedTimeWorkerLogdir(File logDir) { long dirModified = logDir.lastModified(); - DirectoryStream dirStream = null; + DirectoryStream dirStream; try { dirStream = directoryCleaner.getStreamForDirectory(logDir); } catch (IOException e) { @@ -256,9 +278,7 @@ private long lastModifiedTimeWorkerLogdir(File logDir) { LOG.error(ex.getMessage(), ex); return dirModified; } finally { - if (DirectoryStream.class.isInstance(dirStream)) { - IOUtils.closeQuietly(dirStream); - } + IOUtils.closeQuietly(dirStream); } } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java index 67b265d1889..bfb30651179 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogFileDownloader.java @@ -18,12 +18,19 @@ package org.apache.storm.daemon.logviewer.utils; +import com.codahale.metrics.Histogram; + import java.io.File; import java.io.IOException; import javax.ws.rs.core.Response; +import org.apache.commons.io.FileUtils; +import org.apache.storm.metric.StormMetricsRegistry; + + public class LogFileDownloader { + private static final Histogram fileDownloadSizeDistMB= StormMetricsRegistry.registerHistogram("logviewer:download-file-size-rounded-MB"); private final String logRoot; private final String daemonLogRoot; @@ -55,6 +62,7 @@ public Response downloadFile(String fileName, String user, boolean isDaemon) thr File file = new File(rootDir, fileName).getCanonicalFile(); if (file.exists()) { if (isDaemon || resourceAuthorizer.isUserAllowedToAccessFile(user, fileName)) { + fileDownloadSizeDistMB.update(Math.round((double) file.length() / FileUtils.ONE_MB)); return LogviewerResponseBuilder.buildDownloadFile(file); } else { return LogviewerResponseBuilder.buildResponseUnauthorizedUser(user); diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java index b92a5595dff..4c8a1917034 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogviewerResponseBuilder.java @@ -76,13 +76,18 @@ public static Response buildSuccessJsonResponse(Object entity, String callback, * @param file file to download */ public static Response buildDownloadFile(File file) throws IOException { - // do not close this InputStream in method: it will be used from jetty server - InputStream is = new FileInputStream(file); - return Response.status(OK) - .entity(wrapWithStreamingOutput(is)) - .type(MediaType.APPLICATION_OCTET_STREAM_TYPE) - .header("Content-Disposition", "attachment; filename=\"" + file.getName() + "\"") - .build(); + try { + // do not close this InputStream in method: it will be used from jetty server + InputStream is = new FileInputStream(file); + return Response.status(OK) + .entity(wrapWithStreamingOutput(is)) + .type(MediaType.APPLICATION_OCTET_STREAM_TYPE) + .header("Content-Disposition", "attachment; filename=\"" + file.getName() + "\"") + .build(); + } catch (IOException e) { + ExceptionMeters.NUM_FILE_DOWNLOAD_EXCEPTIONS.mark(); + throw e; + } } /** diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java index eda2478e528..d566e3d6c9a 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java @@ -22,7 +22,6 @@ import static java.util.stream.Collectors.toMap; import static org.apache.storm.Config.SUPERVISOR_RUN_WORKER_AS_USER; import static org.apache.storm.Config.TOPOLOGY_SUBMITTER_USER; -import static org.apache.storm.daemon.utils.ListFunctionalSupport.takeLast; import com.google.common.collect.Lists; @@ -88,9 +87,14 @@ public void setLogFilePermission(String fileName) throws IOException { if (runAsUser && topoOwner.isPresent() && file.exists() && !Files.isReadable(file.toPath())) { LOG.debug("Setting permissions on file {} with topo-owner {}", fileName, topoOwner); - ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(), - Lists.newArrayList("blob", file.getCanonicalPath()), null, - "setup group read permissions for file: " + fileName); + try { + ClientSupervisorUtils.processLauncherAndWait(stormConf, topoOwner.get(), + Lists.newArrayList("blob", file.getCanonicalPath()), null, + "setup group read permissions for file: " + fileName); + } catch (IOException e) { + ExceptionMeters.NUM_SET_PERMISSION_EXCEPTIONS.mark(); + throw e; + } } } @@ -127,7 +131,7 @@ public Set getAllWorkerDirs() { /** * Return a sorted set of java.io.Files that were written by workers that are now active. */ - public SortedSet getAliveWorkerDirs() throws Exception { + public SortedSet getAliveWorkerDirs() { Set aliveIds = getAliveIds(Time.currentTimeSecs()); Set logDirs = getAllWorkerDirs(); Map idToDir = identifyWorkerLogDirs(logDirs); @@ -177,7 +181,7 @@ public String getTopologyOwnerFromMetadataFile(String metaFile) { * * @param nowSecs current time in seconds */ - public Set getAliveIds(int nowSecs) throws Exception { + public Set getAliveIds(int nowSecs) { return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream() .filter(entry -> Objects.nonNull(entry.getValue()) && !SupervisorUtils.isWorkerHbTimedOut(nowSecs, entry.getValue(), stormConf)) diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java index 85285ac38d5..08881be3a70 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/webapp/LogviewerResource.java @@ -19,6 +19,7 @@ package org.apache.storm.daemon.logviewer.webapp; import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; import java.io.IOException; import java.net.URLDecoder; @@ -38,6 +39,7 @@ import org.apache.storm.daemon.logviewer.handler.LogviewerLogPageHandler; import org.apache.storm.daemon.logviewer.handler.LogviewerLogSearchHandler; import org.apache.storm.daemon.logviewer.handler.LogviewerProfileHandler; +import org.apache.storm.daemon.logviewer.utils.ExceptionMeters; import org.apache.storm.daemon.ui.InvalidRequestException; import org.apache.storm.daemon.ui.UIHelpers; import org.apache.storm.daemon.ui.resources.StormApiResource; @@ -62,6 +64,14 @@ public class LogviewerResource { "logviewer:num-download-log-daemon-file-http-requests"); private static final Meter meterListLogsHttpRequests = StormMetricsRegistry.registerMeter("logviewer:num-list-logs-http-requests"); + private static final Meter numSearchLogRequests = StormMetricsRegistry.registerMeter("logviewer:num-search-logs-requests"); + private static final Meter numDeepSearchArchived = StormMetricsRegistry.registerMeter( + "logviewer:num-deep-search-requests-with-archived"); + private static final Meter numDeepSearchNonArchived = StormMetricsRegistry.registerMeter( + "logviewer:num-deep-search-requests-without-archived"); + private static final Timer searchLogRequestDuration = StormMetricsRegistry.registerTimer("logviewer:search-requests-duration-ms"); + private static final Timer deepSearchRequestDuration = StormMetricsRegistry.registerTimer("logviewer:deep-search-request-duration-ms"); + private final LogviewerLogPageHandler logviewer; private final LogviewerProfileHandler profileHandler; private final LogviewerLogDownloadHandler logDownloadHandler; @@ -105,6 +115,9 @@ public Response log(@Context HttpServletRequest request) throws IOException { } catch (InvalidRequestException e) { LOG.error(e.getMessage(), e); return Response.status(400).entity(e.getMessage()).build(); + } catch (IOException e) { + ExceptionMeters.NUM_READ_LOG_EXCEPTIONS.mark(); + throw e; } } @@ -126,6 +139,9 @@ public Response daemonLog(@Context HttpServletRequest request) throws IOExceptio } catch (InvalidRequestException e) { LOG.error(e.getMessage(), e); return Response.status(400).entity(e.getMessage()).build(); + } catch (IOException e) { + ExceptionMeters.NUM_READ_DAEMON_LOG_EXCEPTIONS.mark(); + throw e; } } @@ -158,7 +174,12 @@ public Response listLogs(@Context HttpServletRequest request) throws IOException String callback = request.getParameter(StormApiResource.callbackParameterName); String origin = request.getHeader("Origin"); - return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); + try { + return logviewer.listLogFiles(user, portStr != null ? Integer.parseInt(portStr) : null, topologyId, callback, origin); + } catch (IOException e) { + ExceptionMeters.NUM_LIST_LOG_EXCEPTIONS.mark(); + throw e; + } } /** @@ -169,7 +190,12 @@ public Response listLogs(@Context HttpServletRequest request) throws IOException public Response listDumpFiles(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, @Context HttpServletRequest request) throws IOException { String user = httpCredsHandler.getUserName(request); - return profileHandler.listDumpFiles(topologyId, hostPort, user); + try { + return profileHandler.listDumpFiles(topologyId, hostPort, user); + } catch (IOException e) { + ExceptionMeters.NUM_LIST_DUMP_EXCEPTIONS.mark(); + throw e; + } } /** @@ -180,7 +206,12 @@ public Response listDumpFiles(@PathParam("topo-id") String topologyId, @PathPara public Response downloadDumpFile(@PathParam("topo-id") String topologyId, @PathParam("host-port") String hostPort, @PathParam("filename") String fileName, @Context HttpServletRequest request) throws IOException { String user = httpCredsHandler.getUserName(request); - return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, user); + try { + return profileHandler.downloadDumpFile(topologyId, hostPort, fileName, user); + } catch (IOException e) { + ExceptionMeters.NUM_DOWNLOAD_DUMP_EXCEPTIONS.mark(); + throw e; + } } /** @@ -194,7 +225,12 @@ public Response downloadLogFile(@Context HttpServletRequest request) throws IOEx String user = httpCredsHandler.getUserName(request); String file = request.getParameter("file"); String decodedFileName = URLDecoder.decode(file); - return logDownloadHandler.downloadLogFile(decodedFileName, user); + try { + return logDownloadHandler.downloadLogFile(decodedFileName, user); + } catch (IOException e) { + ExceptionMeters.NUM_DOWNLOAD_LOG_EXCEPTIONS.mark(); + throw e; + } } /** @@ -208,7 +244,12 @@ public Response downloadDaemonLogFile(@Context HttpServletRequest request) throw String user = httpCredsHandler.getUserName(request); String file = request.getParameter("file"); String decodedFileName = URLDecoder.decode(file); - return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user); + try { + return logDownloadHandler.downloadDaemonLogFile(decodedFileName, user); + } catch (IOException e) { + ExceptionMeters.NUM_DOWNLOAD_DAEMON_LOG_EXCEPTIONS.mark(); + throw e; + } } /** @@ -217,6 +258,8 @@ public Response downloadDaemonLogFile(@Context HttpServletRequest request) throw @GET @Path("/search") public Response search(@Context HttpServletRequest request) throws IOException { + numSearchLogRequests.mark(); + String user = httpCredsHandler.getUserName(request); boolean isDaemon = StringUtils.equals(request.getParameter("is-daemon"), "yes"); String file = request.getParameter("file"); @@ -227,14 +270,17 @@ public Response search(@Context HttpServletRequest request) throws IOException { String callback = request.getParameter(StormApiResource.callbackParameterName); String origin = request.getHeader("Origin"); - try { - return logSearchHandler.searchLogFile(decodedFileName, user, isDaemon, searchString, numMatchesStr, - startByteOffset, callback, origin); + try (Timer.Context t = searchLogRequestDuration.time()) { + return logSearchHandler.searchLogFile(decodedFileName, user, isDaemon, + searchString, numMatchesStr, startByteOffset, callback, origin); } catch (InvalidRequestException e) { LOG.error(e.getMessage(), e); int statusCode = 400; return new JsonResponseBuilder().setData(UIHelpers.exceptionToJson(e, statusCode)).setCallback(callback) - .setStatus(statusCode).build(); + .setStatus(statusCode).build(); + } catch (IOException e) { + ExceptionMeters.NUM_SEARCH_EXCEPTIONS.mark(); + throw e; } } @@ -244,7 +290,7 @@ public Response search(@Context HttpServletRequest request) throws IOException { @GET @Path("/deepSearch/{topoId}") public Response deepSearch(@PathParam("topoId") String topologyId, - @Context HttpServletRequest request) throws IOException { + @Context HttpServletRequest request) { String user = httpCredsHandler.getUserName(request); String searchString = request.getParameter("search-string"); String numMatchesStr = request.getParameter("num-matches"); @@ -255,8 +301,16 @@ public Response deepSearch(@PathParam("topoId") String topologyId, String callback = request.getParameter(StormApiResource.callbackParameterName); String origin = request.getHeader("Origin"); - return logSearchHandler.deepSearchLogsForTopology(topologyId, user, searchString, numMatchesStr, portStr, - startFileOffset, startByteOffset, BooleanUtils.toBooleanObject(searchArchived), callback, origin); + Boolean alsoSearchArchived = BooleanUtils.toBooleanObject(searchArchived); + if (BooleanUtils.isTrue(alsoSearchArchived)) { + numDeepSearchArchived.mark(); + } else { + numDeepSearchNonArchived.mark(); + } + try (Timer.Context t = deepSearchRequestDuration.time()) { + return logSearchHandler.deepSearchLogsForTopology(topologyId, user, searchString, numMatchesStr, portStr, startFileOffset, + startByteOffset, alsoSearchArchived, callback, origin); + } } private int parseIntegerFromMap(Map map, String parameterKey) throws InvalidRequestException { diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java index c7a1fd8455f..0a450be80b6 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/handler/LogviewerLogSearchHandlerTest.java @@ -627,6 +627,7 @@ public void testFindNMatches() { public static class TestDeepSearchLogs { + public static final int METRIC_SCANNED_FILES = 0; private List logFiles; private String topoPath; @@ -857,7 +858,7 @@ private LogviewerLogSearchHandler getStubbedSearchHandler() { int fileOffset = (Integer) arguments[2]; String search = (String) arguments[4]; - return new LogviewerLogSearchHandler.Matched(fileOffset, search, Collections.emptyList()); + return new LogviewerLogSearchHandler.Matched(fileOffset, search, Collections.emptyList(), METRIC_SCANNED_FILES); }).when(handler).findNMatches(any(), anyInt(), anyInt(), anyInt(), any()); return handler; diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java index 491de542fa3..8b1c0b46045 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java @@ -159,7 +159,10 @@ public void testPerWorkerDirectoryCleanup() throws IOException { WorkerLogs workerLogs = new WorkerLogs(conf, rootDir); LogCleaner logCleaner = new LogCleaner(conf, workerLogs, mockDirectoryCleaner, rootDir); - List deletedFiles = logCleaner.perWorkerDirCleanup(1200); + List deletedFiles = logCleaner.perWorkerDirCleanup(1200) + .stream() + .map(deletionMeta -> deletionMeta.deletedFiles) + .collect(toList()); assertEquals(Integer.valueOf(4), deletedFiles.get(0)); assertEquals(Integer.valueOf(4), deletedFiles.get(1)); assertEquals(Integer.valueOf(4), deletedFiles.get(deletedFiles.size() - 1)); @@ -218,13 +221,13 @@ public void testGlobalLogCleanup() throws Exception { Map conf = Utils.readStormConfig(); WorkerLogs stubbedWorkerLogs = new WorkerLogs(conf, rootDir) { @Override - public SortedSet getAliveWorkerDirs() throws Exception { + public SortedSet getAliveWorkerDirs() { return new TreeSet<>(Collections.singletonList("/workers-artifacts/topo1/port1")); } }; LogCleaner logCleaner = new LogCleaner(conf, stubbedWorkerLogs, mockDirectoryCleaner, rootDir); - int deletedFiles = logCleaner.globalLogCleanup(2400); + int deletedFiles = logCleaner.globalLogCleanup(2400).deletedFiles; assertEquals(18, deletedFiles); } finally { Utils.setInstance(prevUtils);