From 67fdd91244bcfbbdfd2f03d193da687a967d5c6e Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Mon, 20 Feb 2017 19:09:15 +0800 Subject: [PATCH 1/8] add accepted app stream --- .../hadoop/queue/HadoopQueueRunningApp.java | 29 +++-- .../queue/common/HadoopClusterConstants.java | 25 ++--- .../common/YarnClusterResourceURLBuilder.java | 14 ++- .../crawler/ClusterMetricsParseListener.java | 6 +- .../crawler/RunningAppParseListener.java | 46 +++++--- .../queue/crawler/RunningAppsCrawler.java | 5 +- .../crawler/SchedulerInfoParseListener.java | 8 +- .../hadoop/queue/model/applications/App.java | 19 ++++ .../model/applications/AppStreamInfo.java | 53 +++++++++ .../model/scheduler/QueueStreamInfo.java | 79 ++++++++++++++ .../storm/HadoopQueueMetricPersistBolt.java | 101 ++++++------------ .../queue/storm/HadoopQueueRunningSpout.java | 4 +- ...op.queue.HadoopQueueRunningAppProvider.xml | 53 ++++++++- 13 files changed, 323 insertions(+), 119 deletions(-) create mode 100644 eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java create mode 100644 eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java index 68ca8c753f..877c503f6d 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java @@ -17,24 +17,24 @@ package org.apache.eagle.hadoop.queue; import backtype.storm.generated.StormTopology; -import backtype.storm.topology.BoltDeclarer; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.TopologyBuilder; import com.typesafe.config.Config; import org.apache.eagle.app.StormApplication; import org.apache.eagle.app.environment.impl.StormEnvironment; import org.apache.eagle.app.messaging.StormStreamSink; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMetricPersistBolt; import org.apache.eagle.hadoop.queue.storm.HadoopQueueRunningSpout; +import java.util.HashMap; +import java.util.Map; + public class HadoopQueueRunningApp extends StormApplication { public StormTopology execute(Config config, StormEnvironment environment) { HadoopQueueRunningAppConfig appConfig = new HadoopQueueRunningAppConfig(config); - IRichSpout spout = new HadoopQueueRunningSpout(appConfig); - HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig); - TopologyBuilder builder = new TopologyBuilder(); - int numOfPersistTasks = appConfig.topology.numPersistTasks; int numOfSinkTasks = appConfig.topology.numSinkTasks; int numOfSpoutTasks = 1; @@ -42,12 +42,27 @@ public StormTopology execute(Config config, StormEnvironment environment) { String spoutName = "runningQueueSpout"; String persistBoltName = "persistBolt"; + IRichSpout spout = new HadoopQueueRunningSpout(appConfig); + Map streamMaps = new HashMap<>(); + + String acceptedAppStreamId = persistBoltName + "-to-" + DataSource.RUNNING_APPS.toString(); + String schedulerStreamId = persistBoltName + "-to-" + DataSource.SCHEDULER.toString(); + streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId); + streamMaps.put(DataSource.SCHEDULER, schedulerStreamId); + + HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig, streamMaps); + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout(spoutName, spout, numOfSpoutTasks).setNumTasks(numOfSpoutTasks); builder.setBolt(persistBoltName, bolt, numOfPersistTasks).setNumTasks(numOfPersistTasks).shuffleGrouping(spoutName); - StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_LEAF_QUEUE_STREAM", config); + StormStreamSink queueSinkBolt = environment.getStreamSink("HADOOP_QUEUE_STREAM", config); builder.setBolt("queueKafkaSink", queueSinkBolt, numOfSinkTasks) - .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName); + .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, schedulerStreamId); + + StormStreamSink appSinkBolt = environment.getStreamSink("ACCEPTED_APP_STREAM", config); + builder.setBolt("appKafkaSink", appSinkBolt, numOfSinkTasks) + .setNumTasks(numOfSinkTasks).shuffleGrouping(persistBoltName, acceptedAppStreamId); return builder.createTopology(); } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java index 1d64f8789e..17500197f2 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java @@ -24,13 +24,17 @@ public enum AggregateFunc { } public enum DataType { - METRIC, ENTITY + METRIC, ENTITY, STREAM } public enum DataSource { CLUSTER_METRIC, RUNNING_APPS, SCHEDULER } + public enum AppState { + RUNNING, ACCEPTED + } + public static class MetricName { // Metrics from running apps @@ -61,24 +65,6 @@ public static class MetricName { } - public static class LeafQueueInfo { - public static final String TIMESTAMP = "timestamp"; - public static final String QUEUE_SITE = "site"; - public static final String QUEUE_NAME = "queue"; - public static final String QUEUE_STATE = "state"; - public static final String QUEUE_SCHEDULER = "scheduler"; - public static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity"; - public static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity"; - public static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity"; - public static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity"; - public static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity"; - public static final String QUEUE_USED_MEMORY = "memory"; - public static final String QUEUE_USED_VCORES = "vcores"; - public static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications"; - public static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications"; - public static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications"; - } - public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService"; public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService"; @@ -90,6 +76,7 @@ public static class LeafQueueInfo { public static final String TAG_CLUSTER = "cluster"; // field constants + public static final String FIELD_DATASOURCE = "dataSource"; public static final String FIELD_DATATYPE = "dataType"; public static final String FIELD_DATA = "data"; diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java index 0ee4318cfa..a13ddc43c8 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java @@ -36,7 +36,19 @@ public static String buildClusterMetricsURL(String urlBase) { } public static String buildRunningAppsURL(String urlBase) { - return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING" + "&" + ANONYMOUS_PARAMETER); + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?state=RUNNING&" + ANONYMOUS_PARAMETER); + } + + public static String buildAcceptedAndRunningAppsURL(String urlBase) { + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED,RUNNING&" + ANONYMOUS_PARAMETER); + } + + public static String buildAcceptedAppsURL(String urlBase) { + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "?states=ACCEPTED&" + ANONYMOUS_PARAMETER); + } + + public static String buildAcceptedAppTrackingURL(String urlBase, String appId) { + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "/" + appId + ANONYMOUS_PARAMETER); } public static String buildFinishedAppsURL(String urlBase) { diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java index d3219ef678..2ecc34f40e 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java @@ -70,9 +70,7 @@ private void createMetric(String metricName, long timestamp, double value, Hadoo entity.setValue(new double[] {0.0}); clusterMetricEntities.put(key, entity); } - if (clusterMetricCounts.get(key) == null) { - clusterMetricCounts.put(key, 0); - } + clusterMetricCounts.putIfAbsent(key, 0); updateEntityAggValue(entity, aggFunc, value, clusterMetricCounts.get(key)); clusterMetricCounts.put(key, clusterMetricCounts.get(key) + 1); } @@ -89,7 +87,7 @@ public void onMetric(ClusterMetrics metrics, long currentTimestamp) { public void flush() { HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.CLUSTER_METRIC, System.currentTimeMillis()); List metrics = new ArrayList<>(clusterMetricEntities.values()); - this.collector.emit(new ValuesArray(DataType.METRIC.name(), metrics), messageId); + this.collector.emit(new ValuesArray(DataSource.CLUSTER_METRIC.name(), DataType.METRIC.name(), metrics), messageId); reset(); } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java index 364a1a7b1f..202ea1abda 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java @@ -25,6 +25,9 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; +import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder; import org.apache.eagle.hadoop.queue.model.applications.App; import org.apache.eagle.hadoop.queue.model.applications.Apps; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; @@ -54,19 +57,29 @@ public class RunningAppParseListener { }; private String site; + private String rmUrl; private SpoutOutputCollector collector; private Map appMetricEntities = new HashMap<>(); + private List acceptedApps = new ArrayList<>(); - public RunningAppParseListener(String site, SpoutOutputCollector collector) { + public RunningAppParseListener(String site, SpoutOutputCollector collector, String rmUrl) { this.site = site; + this.rmUrl = rmUrl; this.collector = collector; } public void flush() { - logger.info("start sending app metrics, size: " + appMetricEntities.size()); - HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.RUNNING_APPS, System.currentTimeMillis()); + logger.info("crawled {} running app metrics", appMetricEntities.size()); + HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.RUNNING_APPS, System.currentTimeMillis()); List metrics = new ArrayList<>(appMetricEntities.values()); - collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId); + collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.METRIC.name(), metrics), messageId); + + logger.info("crawled {} accepted apps", acceptedApps.size()); + messageId = new HadoopQueueMessageId(DataType.STREAM, DataSource.RUNNING_APPS, System.currentTimeMillis()); + List entities = new ArrayList<>(acceptedApps); + collector.emit(new ValuesArray(DataSource.RUNNING_APPS.name(), DataType.STREAM.name(), entities), messageId); + + acceptedApps.clear(); appMetricEntities.clear(); } @@ -97,16 +110,21 @@ private void createMetric(String metricName, Map tags, long time public void onMetric(Apps apps, long timestamp) throws Exception { timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL; for (App app : apps.getApp()) { - Map tags = new HashMap<>(); - tags.put(HadoopClusterConstants.TAG_USER, app.getUser()); - tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue()); - for (AggLevel level : AggLevel.values()) { - Map newTags = buildMetricTags(level, tags); - for (java.util.Map.Entry entry : metrics.entrySet()) { - Method method = App.class.getMethod(entry.getValue()); - Integer value = (Integer) method.invoke(app); - String metricName = String.format(entry.getKey(), level.name); - createMetric(metricName, newTags, timestamp, value); + if (app.getState().equalsIgnoreCase(HadoopClusterConstants.AppState.ACCEPTED.toString())) { + app.setTrackingUrl(YarnClusterResourceURLBuilder.buildAcceptedAppTrackingURL(rmUrl, app.getId())); + acceptedApps.add(app); + } else { + Map tags = new HashMap<>(); + tags.put(HadoopClusterConstants.TAG_USER, app.getUser()); + tags.put(HadoopClusterConstants.TAG_QUEUE, app.getQueue()); + for (AggLevel level : AggLevel.values()) { + Map newTags = buildMetricTags(level, tags); + for (java.util.Map.Entry entry : metrics.entrySet()) { + Method method = App.class.getMethod(entry.getValue()); + Integer value = (Integer) method.invoke(app); + String metricName = String.format(entry.getKey(), level.name); + createMetric(metricName, newTags, timestamp, value); + } } } } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java index 3ffd371d1b..39eec80c49 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppsCrawler.java @@ -36,9 +36,10 @@ public class RunningAppsCrawler implements Runnable { private String urlString; public RunningAppsCrawler(String site, String baseUrl, SpoutOutputCollector collector) { - this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl); + this.urlString = YarnClusterResourceURLBuilder.buildAcceptedAndRunningAppsURL(baseUrl); + //this.urlString = YarnClusterResourceURLBuilder.buildRunningAppsURL(baseUrl); //this.urlString = YarnClusterResourceURLBuilder.buildFinishedAppsURL(baseUrl); - listener = new RunningAppParseListener(site, collector); + listener = new RunningAppParseListener(site, collector, baseUrl); } @Override diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index 67cc5c9174..1cc5abcafa 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -21,6 +21,8 @@ import org.apache.eagle.dataproc.impl.storm.ValuesArray; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; import org.apache.eagle.hadoop.queue.model.scheduler.*; import org.apache.eagle.hadoop.queue.model.scheduler.Queue; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; @@ -68,12 +70,12 @@ public void flush() { LOG.info("Flushing {} RunningQueue metrics in memory", metricEntities.size()); HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); List metrics = new ArrayList<>(metricEntities); - collector.emit(new ValuesArray(HadoopClusterConstants.DataType.METRIC.name(), metrics), messageId); + collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.METRIC.name(), metrics), messageId); LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size()); - messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); + messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.SCHEDULER, System.currentTimeMillis()); List entities = new ArrayList<>(runningQueueAPIEntities); - collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId); + collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.ENTITY.name(), entities), messageId); runningQueueAPIEntities.clear(); metricEntities.clear(); diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java index b1cbb42611..393ede3cff 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/App.java @@ -51,6 +51,9 @@ public class App { private int allocatedMB; private int allocatedVCores; private int runningContainers; + // for HDP 2.7 + private double queueUsagePercentage; + private double clusterUsagePercentage; public String getId() { return id; @@ -219,4 +222,20 @@ public int getRunningContainers() { public void setRunningContainers(int runningContainers) { this.runningContainers = runningContainers; } + + public double getQueueUsagePercentage() { + return queueUsagePercentage; + } + + public void setQueueUsagePercentage(double queueUsagePercentage) { + this.queueUsagePercentage = queueUsagePercentage; + } + + public double getClusterUsagePercentage() { + return clusterUsagePercentage; + } + + public void setClusterUsagePercentage(double clusterUsagePercentage) { + this.clusterUsagePercentage = clusterUsagePercentage; + } } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java new file mode 100644 index 0000000000..00fceb9932 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.applications; + +import java.util.HashMap; +import java.util.Map; + +public class AppStreamInfo { + private static final String SITE = "site"; + private static final String ID = "id"; + private static final String USER = "user"; + private static final String NAME = "appName"; + private static final String QUEUE = "queue"; + private static final String STATE = "state"; + private static final String STARTEDTIME = "startTime"; + private static final String ELAPSEDTIME = "elapsedTime"; + private static final String QUEUE_USAGE_PERCENTAGE = "queueUsagePercentage"; + private static final String CLUSTER_USAGE_PERCENTAGE = "clusterUsagePercentage"; + private static final String TRACKING_URL = "trackingUrl"; + + public static Map convertAppToStream(App app, String site) { + Map queueStreamInfo = new HashMap<>(); + queueStreamInfo.put(SITE, site); + queueStreamInfo.put(ID, app.getId()); + queueStreamInfo.put(USER, app.getUser()); + queueStreamInfo.put(NAME, app.getName()); + queueStreamInfo.put(QUEUE, app.getQueue()); + queueStreamInfo.put(STATE, app.getState()); + queueStreamInfo.put(ELAPSEDTIME, app.getElapsedTime()); + queueStreamInfo.put(STARTEDTIME, app.getStartedTime()); + queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, app.getQueueUsagePercentage()); + queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, app.getClusterUsagePercentage()); + queueStreamInfo.put(TRACKING_URL, app.getTrackingUrl()); + + return queueStreamInfo; + } + +} diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java new file mode 100644 index 0000000000..af06b275e8 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStreamInfo.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.scheduler; + +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; + +import java.util.HashMap; +import java.util.Map; + +public class QueueStreamInfo { + private static final String TIMESTAMP = "timestamp"; + private static final String QUEUE_SITE = "site"; + public static final String QUEUE_NAME = "queue"; + private static final String QUEUE_STATE = "state"; + private static final String QUEUE_SCHEDULER = "scheduler"; + private static final String QUEUE_ABSOLUTE_CAPACITY = "absoluteCapacity"; + private static final String QUEUE_ABSOLUTE_MAX_CAPACITY = "absoluteMaxCapacity"; + private static final String QUEUE_ABSOLUTE_USED_CAPACITY = "absoluteUsedCapacity"; + private static final String QUEUE_MAX_USER_USED_CAPACITY = "maxUserUsedCapacity"; + private static final String QUEUE_USER_LIMIT_CAPACITY = "userLimitCapacity"; + private static final String QUEUE_USED_MEMORY = "memory"; + private static final String QUEUE_USED_VCORES = "vcores"; + private static final String QUEUE_NUM_ACTIVE_APPS = "numActiveApplications"; + private static final String QUEUE_NUM_PENDING_APPS = "numPendingApplications"; + private static final String QUEUE_MAX_ACTIVE_APPS = "maxActiveApplications"; + + + public static Map convertEntityToStream(RunningQueueAPIEntity queueAPIEntity) { + Map queueInfoMap = new HashMap<>(); + queueInfoMap.put(QueueStreamInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE)); + queueInfoMap.put(QueueStreamInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE)); + queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity()); + queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity()); + queueInfoMap.put(QueueStreamInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity()); + queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications()); + queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications()); + queueInfoMap.put(QueueStreamInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications()); + queueInfoMap.put(QueueStreamInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler()); + queueInfoMap.put(QueueStreamInfo.QUEUE_STATE, queueAPIEntity.getState()); + queueInfoMap.put(QueueStreamInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory()); + queueInfoMap.put(QueueStreamInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores()); + queueInfoMap.put(QueueStreamInfo.TIMESTAMP, queueAPIEntity.getTimestamp()); + + double maxUserUsedCapacity = 0; + double userUsedCapacity; + for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) { + userUsedCapacity = calculateUserUsedCapacity( + queueAPIEntity.getAbsoluteUsedCapacity(), + queueAPIEntity.getMemory(), + user.getMemory()); + if (userUsedCapacity > maxUserUsedCapacity) { + maxUserUsedCapacity = userUsedCapacity; + } + + } + queueInfoMap.put(QueueStreamInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity); + queueInfoMap.put(QueueStreamInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity()); + return queueInfoMap; + } + + private static double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) { + return userUsedMem * absoluteUsedCapacity / queueUsedMem; + } +} diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 9eb7008748..57fe0e8837 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -27,11 +27,11 @@ import backtype.storm.tuple.Values; import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; -import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo; +import org.apache.eagle.hadoop.queue.model.applications.App; +import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo; +import org.apache.eagle.hadoop.queue.model.scheduler.QueueStreamInfo; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; -import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.IEagleServiceClient; import org.apache.eagle.service.client.impl.EagleServiceClientImpl; @@ -39,7 +39,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -47,18 +46,25 @@ public class HadoopQueueMetricPersistBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(HadoopQueueMetricPersistBolt.class); + private Map streamMap; private HadoopQueueRunningAppConfig config; private IEagleServiceClient client; private OutputCollector collector; - public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config) { + public HadoopQueueMetricPersistBolt(HadoopQueueRunningAppConfig config, + Map streamMap) { this.config = config; + this.streamMap = streamMap; } @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { HadoopQueueRunningAppConfig.EagleProps.EagleService eagleService = config.eagleProps.eagleService; - this.client = new EagleServiceClientImpl(eagleService.host, eagleService.port, eagleService.username, eagleService.password); + this.client = new EagleServiceClientImpl( + eagleService.host, + eagleService.port, + eagleService.username, + eagleService.password); this.collector = collector; } @@ -67,30 +73,41 @@ public void execute(Tuple input) { if (input == null) { return; } + String dataSource = input.getStringByField(HadoopClusterConstants.FIELD_DATASOURCE); String dataType = input.getStringByField(HadoopClusterConstants.FIELD_DATATYPE); Object data = input.getValueByField(HadoopClusterConstants.FIELD_DATA); - if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.METRIC.toString())) { - List metrics = (List) data; - writeMetrics(metrics); - } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) { + + if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.STREAM.toString())) { + List apps = (List) data; + for (App app : apps) { + collector.emit(streamMap.get(dataSource), new Values(app.getId(), + AppStreamInfo.convertAppToStream(app, config.eagleProps.site))); + } + } else { List entities = (List) data; for (TaggedLogAPIEntity entity : entities) { if (entity instanceof RunningQueueAPIEntity) { RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity; if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { - collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), - parseLeafQueueInfo(queue))); + String queueName = queue.getTags().get(HadoopClusterConstants.TAG_QUEUE); + collector.emit(streamMap.get(dataSource), new Values(queueName, QueueStreamInfo.convertEntityToStream(queue))); } } } - writeEntities(entities); + writeEntities(entities, dataType, dataSource); } this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(HadoopClusterConstants.LeafQueueInfo.QUEUE_NAME, "message")); + if (streamMap != null) { + for (String stormStreamId : streamMap.values()) { + declarer.declareStream(stormStreamId, new Fields("f1", "message")); + } + } else { + declarer.declare(new Fields("f1", "message")); + } } @Override @@ -104,67 +121,17 @@ public void cleanup() { } } - private void writeEntities(List entities) { + private void writeEntities(List entities, String dataType, String dataSource) { try { GenericServiceAPIResponseEntity response = client.create(entities); if (!response.isSuccess()) { LOG.error("Got exception from eagle service: " + response.getException()); } else { - LOG.info("Successfully wrote " + entities.size() + " RunningQueueAPIEntity entities"); + LOG.info("Successfully wrote {} items of {} for {}", entities.size(), dataType, dataSource); } } catch (Exception e) { - LOG.error("cannot create running queue entities successfully", e); + LOG.error("cannot create {} entities", entities.size(), e); } entities.clear(); } - - private void writeMetrics(List entities) { - try { - GenericServiceAPIResponseEntity response = client.create(entities); - if (response.isSuccess()) { - LOG.info("Successfully wrote " + entities.size() + " GenericMetricEntity entities"); - } else { - LOG.error(response.getException()); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - } - - private Map parseLeafQueueInfo(RunningQueueAPIEntity queueAPIEntity) { - Map queueInfoMap = new HashMap<>(); - queueInfoMap.put(LeafQueueInfo.QUEUE_SITE, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_SITE)); - queueInfoMap.put(LeafQueueInfo.QUEUE_NAME, queueAPIEntity.getTags().get(HadoopClusterConstants.TAG_QUEUE)); - queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_CAPACITY, queueAPIEntity.getAbsoluteCapacity()); - queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_MAX_CAPACITY, queueAPIEntity.getAbsoluteMaxCapacity()); - queueInfoMap.put(LeafQueueInfo.QUEUE_ABSOLUTE_USED_CAPACITY, queueAPIEntity.getAbsoluteUsedCapacity()); - queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_ACTIVE_APPS, queueAPIEntity.getMaxActiveApplications()); - queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_ACTIVE_APPS, queueAPIEntity.getNumActiveApplications()); - queueInfoMap.put(LeafQueueInfo.QUEUE_NUM_PENDING_APPS, queueAPIEntity.getNumPendingApplications()); - queueInfoMap.put(LeafQueueInfo.QUEUE_SCHEDULER, queueAPIEntity.getScheduler()); - queueInfoMap.put(LeafQueueInfo.QUEUE_STATE, queueAPIEntity.getState()); - queueInfoMap.put(LeafQueueInfo.QUEUE_USED_MEMORY, queueAPIEntity.getMemory()); - queueInfoMap.put(LeafQueueInfo.QUEUE_USED_VCORES, queueAPIEntity.getVcores()); - queueInfoMap.put(LeafQueueInfo.TIMESTAMP, queueAPIEntity.getTimestamp()); - - double maxUserUsedCapacity = 0; - double userUsedCapacity; - for (UserWrapper user : queueAPIEntity.getUsers().getUsers()) { - userUsedCapacity = calculateUserUsedCapacity( - queueAPIEntity.getAbsoluteUsedCapacity(), - queueAPIEntity.getMemory(), - user.getMemory()); - if (userUsedCapacity > maxUserUsedCapacity) { - maxUserUsedCapacity = userUsedCapacity; - } - - } - queueInfoMap.put(LeafQueueInfo.QUEUE_MAX_USER_USED_CAPACITY, maxUserUsedCapacity); - queueInfoMap.put(LeafQueueInfo.QUEUE_USER_LIMIT_CAPACITY, queueAPIEntity.getUserLimitFactor() * queueAPIEntity.getAbsoluteCapacity()); - return queueInfoMap; - } - - private double calculateUserUsedCapacity(double absoluteUsedCapacity, long queueUsedMem, long userUsedMem) { - return userUsedMem * absoluteUsedCapacity / queueUsedMem; - } } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java index 530be9a45d..681f25e2c3 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueRunningSpout.java @@ -51,7 +51,9 @@ public HadoopQueueRunningSpout(HadoopQueueRunningAppConfig config) { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATATYPE, HadoopClusterConstants.FIELD_DATA)); + declarer.declare(new Fields(HadoopClusterConstants.FIELD_DATASOURCE, + HadoopClusterConstants.FIELD_DATATYPE, + HadoopClusterConstants.FIELD_DATA)); } @Override diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml index 5fb041d37a..c91e272aae 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -106,7 +106,7 @@ - HADOOP_LEAF_QUEUE_STREAM + HADOOP_QUEUE_STREAM Hadoop Leaf Queue Info Stream true @@ -172,6 +172,57 @@ + + ACCEPTED_APP_STREAM + Accepted App Info Stream + true + + + id + string + + + site + string + + + appName + string + + + queue + string + + + state + string + + + user + string + + + trackingUrl + string + + + elapsedTime + long + + + startedTime + long + + + queueUsagePercentage + double + + + clusterUsagePercentage + double + + + From c8cdd3d8309d2f5b28295f3527aa242b529a5642 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 21 Feb 2017 10:03:00 +0800 Subject: [PATCH 2/8] fix code check warnings --- .../eagle/hadoop/queue/HadoopQueueRunningApp.java | 8 ++++---- .../queue/storm/HadoopQueueMetricPersistBolt.java | 13 ++++++++----- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java index 877c503f6d..4708baa75f 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/HadoopQueueRunningApp.java @@ -35,10 +35,6 @@ public class HadoopQueueRunningApp extends StormApplication { public StormTopology execute(Config config, StormEnvironment environment) { HadoopQueueRunningAppConfig appConfig = new HadoopQueueRunningAppConfig(config); - int numOfPersistTasks = appConfig.topology.numPersistTasks; - int numOfSinkTasks = appConfig.topology.numSinkTasks; - int numOfSpoutTasks = 1; - String spoutName = "runningQueueSpout"; String persistBoltName = "persistBolt"; @@ -50,6 +46,10 @@ public StormTopology execute(Config config, StormEnvironment environment) { streamMaps.put(DataSource.RUNNING_APPS, acceptedAppStreamId); streamMaps.put(DataSource.SCHEDULER, schedulerStreamId); + int numOfPersistTasks = appConfig.topology.numPersistTasks; + int numOfSinkTasks = appConfig.topology.numSinkTasks; + int numOfSpoutTasks = 1; + HadoopQueueMetricPersistBolt bolt = new HadoopQueueMetricPersistBolt(appConfig, streamMaps); TopologyBuilder builder = new TopologyBuilder(); diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 57fe0e8837..f9534e4a6d 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -27,6 +27,8 @@ import backtype.storm.tuple.Values; import org.apache.eagle.hadoop.queue.HadoopQueueRunningAppConfig; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; import org.apache.eagle.hadoop.queue.model.applications.App; import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo; import org.apache.eagle.hadoop.queue.model.scheduler.QueueStreamInfo; @@ -73,11 +75,11 @@ public void execute(Tuple input) { if (input == null) { return; } - String dataSource = input.getStringByField(HadoopClusterConstants.FIELD_DATASOURCE); - String dataType = input.getStringByField(HadoopClusterConstants.FIELD_DATATYPE); + DataSource dataSource = (DataSource) input.getValueByField(HadoopClusterConstants.FIELD_DATASOURCE); + DataType dataType = (DataType) input.getValueByField(HadoopClusterConstants.FIELD_DATATYPE); Object data = input.getValueByField(HadoopClusterConstants.FIELD_DATA); - if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.STREAM.toString())) { + if (dataType.equals(HadoopClusterConstants.DataType.STREAM)) { List apps = (List) data; for (App app : apps) { collector.emit(streamMap.get(dataSource), new Values(app.getId(), @@ -90,7 +92,8 @@ public void execute(Tuple input) { RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity; if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { String queueName = queue.getTags().get(HadoopClusterConstants.TAG_QUEUE); - collector.emit(streamMap.get(dataSource), new Values(queueName, QueueStreamInfo.convertEntityToStream(queue))); + collector.emit(streamMap.get(dataSource), + new Values(queueName, QueueStreamInfo.convertEntityToStream(queue))); } } } @@ -121,7 +124,7 @@ public void cleanup() { } } - private void writeEntities(List entities, String dataType, String dataSource) { + private void writeEntities(List entities, DataType dataType, DataSource dataSource) { try { GenericServiceAPIResponseEntity response = client.create(entities); if (!response.isSuccess()) { From 84f3945350d4476c4b71fdc0cdf0aebdd5db1027 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 21 Feb 2017 10:09:14 +0800 Subject: [PATCH 3/8] add a topic for app stream data --- ...agle.hadoop.queue.HadoopQueueRunningAppProvider.xml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml index c91e272aae..da22836af6 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -55,11 +55,17 @@ - dataSinkConfig.topic - dataSinkConfig.topic + dataSinkConfig.HADOOP_QUEUE_STREAM.topic + Destination(Kafka Topic) Of Queue Stream Data yarn_queue topic for kafka data sink + + dataSinkConfig.ACCEPTED_APP_STREAM.topic + Destination(Kafka Topic) Of App Stream Data + yarn_accepted_app + topic for kafka data sink + dataSinkConfig.brokerList dataSinkConfig.brokerList From 4572e299f6c26ebc2779649febdd53de91347dba Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 21 Feb 2017 10:31:35 +0800 Subject: [PATCH 4/8] fix a bug --- .../hadoop/queue/crawler/ClusterMetricsParseListener.java | 2 +- .../eagle/hadoop/queue/crawler/RunningAppParseListener.java | 4 ++-- .../hadoop/queue/crawler/SchedulerInfoParseListener.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java index 2ecc34f40e..57dd45496f 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/ClusterMetricsParseListener.java @@ -87,7 +87,7 @@ public void onMetric(ClusterMetrics metrics, long currentTimestamp) { public void flush() { HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.CLUSTER_METRIC, System.currentTimeMillis()); List metrics = new ArrayList<>(clusterMetricEntities.values()); - this.collector.emit(new ValuesArray(DataSource.CLUSTER_METRIC.name(), DataType.METRIC.name(), metrics), messageId); + this.collector.emit(new ValuesArray(DataSource.CLUSTER_METRIC, DataType.METRIC, metrics), messageId); reset(); } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java index 202ea1abda..637523b1a8 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java @@ -72,12 +72,12 @@ public void flush() { logger.info("crawled {} running app metrics", appMetricEntities.size()); HadoopQueueMessageId messageId = new HadoopQueueMessageId(DataType.METRIC, DataSource.RUNNING_APPS, System.currentTimeMillis()); List metrics = new ArrayList<>(appMetricEntities.values()); - collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.METRIC.name(), metrics), messageId); + collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.METRIC, metrics), messageId); logger.info("crawled {} accepted apps", acceptedApps.size()); messageId = new HadoopQueueMessageId(DataType.STREAM, DataSource.RUNNING_APPS, System.currentTimeMillis()); List entities = new ArrayList<>(acceptedApps); - collector.emit(new ValuesArray(DataSource.RUNNING_APPS.name(), DataType.STREAM.name(), entities), messageId); + collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.STREAM, entities), messageId); acceptedApps.clear(); appMetricEntities.clear(); diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index 1cc5abcafa..165bdb1000 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -70,12 +70,12 @@ public void flush() { LOG.info("Flushing {} RunningQueue metrics in memory", metricEntities.size()); HadoopQueueMessageId messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.METRIC, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); List metrics = new ArrayList<>(metricEntities); - collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.METRIC.name(), metrics), messageId); + collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.METRIC, metrics), messageId); LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size()); messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.SCHEDULER, System.currentTimeMillis()); List entities = new ArrayList<>(runningQueueAPIEntities); - collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.ENTITY.name(), entities), messageId); + collector.emit(new ValuesArray(DataSource.SCHEDULER, DataType.ENTITY, entities), messageId); runningQueueAPIEntities.clear(); metricEntities.clear(); From 818ca2267947593ba20a26b50857eeb359d75e9e Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 21 Feb 2017 12:58:43 +0800 Subject: [PATCH 5/8] add YarnAppAPIEntity --- .../queue/common/HadoopClusterConstants.java | 3 +- .../crawler/RunningAppParseListener.java | 31 ++++- .../model/HadoopQueueEntityRepository.java | 2 + .../model/applications/AppStreamInfo.java | 32 ++--- .../model/applications/YarnAppAPIEntity.java | 111 ++++++++++++++++++ .../storm/HadoopQueueMetricPersistBolt.java | 16 +-- 6 files changed, 164 insertions(+), 31 deletions(-) create mode 100644 eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java index 17500197f2..159da219aa 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java @@ -24,7 +24,7 @@ public enum AggregateFunc { } public enum DataType { - METRIC, ENTITY, STREAM + METRIC, ENTITY } public enum DataSource { @@ -67,6 +67,7 @@ public static class MetricName { public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService"; public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService"; + public static final String ACCEPTED_APP_SERVICE_NAME = "AcceptedAppService"; // tag constants public static final String TAG_PARENT_QUEUE = "parentQueue"; diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java index 637523b1a8..451d32d78f 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java @@ -29,7 +29,9 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; import org.apache.eagle.hadoop.queue.common.YarnClusterResourceURLBuilder; import org.apache.eagle.hadoop.queue.model.applications.App; +import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo; import org.apache.eagle.hadoop.queue.model.applications.Apps; +import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; import org.apache.eagle.log.entity.GenericMetricEntity; import backtype.storm.spout.SpoutOutputCollector; @@ -60,7 +62,7 @@ public class RunningAppParseListener { private String rmUrl; private SpoutOutputCollector collector; private Map appMetricEntities = new HashMap<>(); - private List acceptedApps = new ArrayList<>(); + private List acceptedApps = new ArrayList<>(); public RunningAppParseListener(String site, SpoutOutputCollector collector, String rmUrl) { this.site = site; @@ -75,9 +77,9 @@ public void flush() { collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.METRIC, metrics), messageId); logger.info("crawled {} accepted apps", acceptedApps.size()); - messageId = new HadoopQueueMessageId(DataType.STREAM, DataSource.RUNNING_APPS, System.currentTimeMillis()); - List entities = new ArrayList<>(acceptedApps); - collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.STREAM, entities), messageId); + messageId = new HadoopQueueMessageId(DataType.ENTITY, DataSource.RUNNING_APPS, System.currentTimeMillis()); + List entities = new ArrayList<>(acceptedApps); + collector.emit(new ValuesArray(DataSource.RUNNING_APPS, DataType.ENTITY, entities), messageId); acceptedApps.clear(); appMetricEntities.clear(); @@ -111,8 +113,16 @@ public void onMetric(Apps apps, long timestamp) throws Exception { timestamp = timestamp / AGGREGATE_INTERVAL * AGGREGATE_INTERVAL; for (App app : apps.getApp()) { if (app.getState().equalsIgnoreCase(HadoopClusterConstants.AppState.ACCEPTED.toString())) { - app.setTrackingUrl(YarnClusterResourceURLBuilder.buildAcceptedAppTrackingURL(rmUrl, app.getId())); - acceptedApps.add(app); + YarnAppAPIEntity appAPIEntity = new YarnAppAPIEntity(); + appAPIEntity.setTags(buildAppTags(app)); + appAPIEntity.setTrackingUrl(YarnClusterResourceURLBuilder.buildAcceptedAppTrackingURL(rmUrl, app.getId())); + appAPIEntity.setAppName(app.getName()); + appAPIEntity.setClusterUsagePercentage(app.getClusterUsagePercentage()); + appAPIEntity.setQueueUsagePercentage(app.getQueueUsagePercentage()); + appAPIEntity.setElapsedTime(app.getElapsedTime()); + appAPIEntity.setStartedTime(app.getStartedTime()); + appAPIEntity.setState(app.getState()); + acceptedApps.add(appAPIEntity); } else { Map tags = new HashMap<>(); tags.put(HadoopClusterConstants.TAG_USER, app.getUser()); @@ -130,6 +140,15 @@ public void onMetric(Apps apps, long timestamp) throws Exception { } } + private Map buildAppTags(App app) { + Map tags = new HashMap<>(); + tags.put(AppStreamInfo.SITE, this.site); + tags.put(AppStreamInfo.ID, app.getId()); + tags.put(AppStreamInfo.QUEUE, app.getQueue()); + tags.put(AppStreamInfo.USER, app.getUser()); + return tags; + } + private enum AggLevel { CLUSTER(HadoopClusterConstants.TAG_CLUSTER, ""), QUEUE(HadoopClusterConstants.TAG_QUEUE, HadoopClusterConstants.TAG_QUEUE), diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java index 40d6e53a07..800bd0345a 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java @@ -17,6 +17,7 @@ */ package org.apache.eagle.hadoop.queue.model; +import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.entity.repo.EntityRepository; @@ -25,5 +26,6 @@ public class HadoopQueueEntityRepository extends EntityRepository { public HadoopQueueEntityRepository() { this.registerEntity(RunningQueueAPIEntity.class); this.registerEntity(QueueStructureAPIEntity.class); + this.registerEntity(YarnAppAPIEntity.class); } } \ No newline at end of file diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java index 00fceb9932..7e720232bc 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/AppStreamInfo.java @@ -21,11 +21,11 @@ import java.util.Map; public class AppStreamInfo { - private static final String SITE = "site"; - private static final String ID = "id"; - private static final String USER = "user"; + public static final String SITE = "site"; + public static final String ID = "id"; + public static final String USER = "user"; + public static final String QUEUE = "queue"; private static final String NAME = "appName"; - private static final String QUEUE = "queue"; private static final String STATE = "state"; private static final String STARTEDTIME = "startTime"; private static final String ELAPSEDTIME = "elapsedTime"; @@ -33,19 +33,19 @@ public class AppStreamInfo { private static final String CLUSTER_USAGE_PERCENTAGE = "clusterUsagePercentage"; private static final String TRACKING_URL = "trackingUrl"; - public static Map convertAppToStream(App app, String site) { + public static Map convertAppToStream(YarnAppAPIEntity appAPIEntity) { Map queueStreamInfo = new HashMap<>(); - queueStreamInfo.put(SITE, site); - queueStreamInfo.put(ID, app.getId()); - queueStreamInfo.put(USER, app.getUser()); - queueStreamInfo.put(NAME, app.getName()); - queueStreamInfo.put(QUEUE, app.getQueue()); - queueStreamInfo.put(STATE, app.getState()); - queueStreamInfo.put(ELAPSEDTIME, app.getElapsedTime()); - queueStreamInfo.put(STARTEDTIME, app.getStartedTime()); - queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, app.getQueueUsagePercentage()); - queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, app.getClusterUsagePercentage()); - queueStreamInfo.put(TRACKING_URL, app.getTrackingUrl()); + queueStreamInfo.put(SITE, appAPIEntity.getTags().get(SITE)); + queueStreamInfo.put(ID, appAPIEntity.getTags().get(ID)); + queueStreamInfo.put(USER, appAPIEntity.getTags().get(USER)); + queueStreamInfo.put(QUEUE, appAPIEntity.getTags().get(QUEUE)); + queueStreamInfo.put(NAME, appAPIEntity.getAppName()); + queueStreamInfo.put(STATE, appAPIEntity.getState()); + queueStreamInfo.put(ELAPSEDTIME, appAPIEntity.getElapsedTime()); + queueStreamInfo.put(STARTEDTIME, appAPIEntity.getStartedTime()); + queueStreamInfo.put(QUEUE_USAGE_PERCENTAGE, appAPIEntity.getQueueUsagePercentage()); + queueStreamInfo.put(CLUSTER_USAGE_PERCENTAGE, appAPIEntity.getClusterUsagePercentage()); + queueStreamInfo.put(TRACKING_URL, appAPIEntity.getTrackingUrl()); return queueStreamInfo; } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java new file mode 100644 index 0000000000..7b365230b8 --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/applications/YarnAppAPIEntity.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.applications; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@Table("yarn_app") +@ColumnFamily("f") +@Prefix("accepted") +@Service(HadoopClusterConstants.ACCEPTED_APP_SERVICE_NAME) +@TimeSeries(true) +@Partition( {"site"}) +@Tags({"site","id","user","queue"}) +public class YarnAppAPIEntity extends TaggedLogAPIEntity { + @Column("a") + private String appName; + @Column("b") + private String state; + @Column("c") + private long startedTime; + @Column("d") + private long elapsedTime; + @Column("e") + private String trackingUrl; + @Column("f") + private double queueUsagePercentage; + @Column("g") + private double clusterUsagePercentage; + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; + valueChanged("appName"); + } + + public String getState() { + return state; + } + + public void setState(String state) { + this.state = state; + valueChanged("state"); + } + + public long getStartedTime() { + return startedTime; + } + + public void setStartedTime(long startedTime) { + this.startedTime = startedTime; + valueChanged("startedTime"); + } + + public long getElapsedTime() { + return elapsedTime; + } + + public void setElapsedTime(long elapsedTime) { + this.elapsedTime = elapsedTime; + valueChanged("elapsedTime"); + } + + public String getTrackingUrl() { + return trackingUrl; + } + + public void setTrackingUrl(String trackingUrl) { + this.trackingUrl = trackingUrl; + valueChanged("trackingUrl"); + } + + public double getQueueUsagePercentage() { + return queueUsagePercentage; + } + + public void setQueueUsagePercentage(double queueUsagePercentage) { + this.queueUsagePercentage = queueUsagePercentage; + valueChanged("queueUsagePercentage"); + } + + public double getClusterUsagePercentage() { + return clusterUsagePercentage; + } + + public void setClusterUsagePercentage(double clusterUsagePercentage) { + this.clusterUsagePercentage = clusterUsagePercentage; + valueChanged("clusterUsagePercentage"); + } +} diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index f9534e4a6d..5f06c66c21 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -29,8 +29,8 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataSource; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.DataType; -import org.apache.eagle.hadoop.queue.model.applications.App; import org.apache.eagle.hadoop.queue.model.applications.AppStreamInfo; +import org.apache.eagle.hadoop.queue.model.applications.YarnAppAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.QueueStreamInfo; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; @@ -79,14 +79,10 @@ public void execute(Tuple input) { DataType dataType = (DataType) input.getValueByField(HadoopClusterConstants.FIELD_DATATYPE); Object data = input.getValueByField(HadoopClusterConstants.FIELD_DATA); - if (dataType.equals(HadoopClusterConstants.DataType.STREAM)) { - List apps = (List) data; - for (App app : apps) { - collector.emit(streamMap.get(dataSource), new Values(app.getId(), - AppStreamInfo.convertAppToStream(app, config.eagleProps.site))); - } + List entities = (List) data; + if (dataType.equals(DataType.METRIC)) { + writeEntities(entities, dataType, dataSource); } else { - List entities = (List) data; for (TaggedLogAPIEntity entity : entities) { if (entity instanceof RunningQueueAPIEntity) { RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity; @@ -95,6 +91,10 @@ public void execute(Tuple input) { collector.emit(streamMap.get(dataSource), new Values(queueName, QueueStreamInfo.convertEntityToStream(queue))); } + } else if (entity instanceof YarnAppAPIEntity) { + YarnAppAPIEntity appAPIEntity = (YarnAppAPIEntity) entity; + collector.emit(streamMap.get(dataSource), + new Values(appAPIEntity.getAppName(), AppStreamInfo.convertAppToStream(appAPIEntity))); } } writeEntities(entities, dataType, dataSource); From ef50a87b97f05812200042ee90674e9aa96bb0db Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 21 Feb 2017 13:08:46 +0800 Subject: [PATCH 6/8] fix tracking url --- .../hadoop/queue/common/YarnClusterResourceURLBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java index a13ddc43c8..7ec24df4eb 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/YarnClusterResourceURLBuilder.java @@ -48,7 +48,7 @@ public static String buildAcceptedAppsURL(String urlBase) { } public static String buildAcceptedAppTrackingURL(String urlBase, String appId) { - return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "/" + appId + ANONYMOUS_PARAMETER); + return PathResolverHelper.buildUrlPath(urlBase, CLUSTER_APPS_API_URL + "/" + appId); } public static String buildFinishedAppsURL(String urlBase) { From 337bb39951c334c13194b1d217c13711b06413c9 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 21 Feb 2017 13:41:37 +0800 Subject: [PATCH 7/8] update --- .../eagle/hadoop/queue/crawler/RunningAppParseListener.java | 1 + .../hadoop/queue/storm/HadoopQueueMetricPersistBolt.java | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java index 451d32d78f..ff54ca3f05 100755 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/RunningAppParseListener.java @@ -122,6 +122,7 @@ public void onMetric(Apps apps, long timestamp) throws Exception { appAPIEntity.setElapsedTime(app.getElapsedTime()); appAPIEntity.setStartedTime(app.getStartedTime()); appAPIEntity.setState(app.getState()); + appAPIEntity.setTimestamp(app.getStartedTime()); acceptedApps.add(appAPIEntity); } else { Map tags = new HashMap<>(); diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 5f06c66c21..43a62b7f49 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -97,7 +97,9 @@ public void execute(Tuple input) { new Values(appAPIEntity.getAppName(), AppStreamInfo.convertAppToStream(appAPIEntity))); } } - writeEntities(entities, dataType, dataSource); + if (!dataSource.equals(DataSource.RUNNING_APPS)) { + writeEntities(entities, dataType, dataSource); + } } this.collector.ack(input); } From f01226be846afd7740ca1f01930d91cd80d48086 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 21 Feb 2017 14:15:58 +0800 Subject: [PATCH 8/8] update --- .../eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java index d25d05b83f..2a99d2649d 100644 --- a/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java +++ b/eagle-jpm/eagle-jpm-util/src/main/java/org/apache/eagle/jpm/util/resourcefetch/ha/AbstractURLSelector.java @@ -91,7 +91,7 @@ public void reSelectUrl() throws IOException { LOG.info("Successfully switch to new url : " + selectedUrl); return; } - LOG.info("try url " + urlToCheck + "fail for " + (time + 1) + " times, sleep 5 seconds before try again. "); + LOG.info("try url " + urlToCheck + " failed for " + (time + 1) + " times, sleep 5 seconds before try again. "); try { Thread.sleep(5 * 1000); } catch (InterruptedException ex) {