From 9622d2cb0669e4305e1332835c21cdf47b7cd0e4 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 14 Feb 2017 11:45:48 +0800 Subject: [PATCH 01/10] add an estimated finished time for a running job --- .../org/apache/eagle/jpm/mr/running/parser/MRJobParser.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java index 6b33d31a2d..525ffc24e4 100644 --- a/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java +++ b/eagle-jpm/eagle-jpm-mr-running/src/main/java/org/apache/eagle/jpm/mr/running/parser/MRJobParser.java @@ -135,6 +135,8 @@ private void finishMRJob(String mrJobId) { JobExecutionAPIEntity jobExecutionAPIEntity = mrJobEntityMap.get(mrJobId); jobExecutionAPIEntity.setInternalState(Constants.AppState.FINISHED.toString()); jobExecutionAPIEntity.setCurrentState(Constants.AppState.RUNNING.toString()); + // set an estimated job finished time because it's hard the get the specific one + jobExecutionAPIEntity.setEndTime(System.currentTimeMillis()); mrJobConfigs.remove(mrJobId); if (mrJobConfigs.size() == 0) { this.parserStatus = ParserStatus.APP_FINISHED; From 803d4b07454de29872546338a37856013815dc48 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 14 Feb 2017 11:50:33 +0800 Subject: [PATCH 02/10] update topic name --- ....apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml | 2 +- ...g.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml | 2 +- .../org.apache.eagle.topology.TopologyCheckAppProvider.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml index 4cf745c112..5fb041d37a 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml +++ b/eagle-jpm/eagle-hadoop-queue/src/main/resources/META-INF/providers/org.apache.eagle.hadoop.queue.HadoopQueueRunningAppProvider.xml @@ -57,7 +57,7 @@ dataSinkConfig.topic dataSinkConfig.topic - hadoop_leaf_queue + yarn_queue topic for kafka data sink diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml index 49694a59a1..19972579a5 100644 --- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml @@ -115,7 +115,7 @@ dataSinkConfig.topic Kafka Topic for Parsed Data Sink - hdfs_audit_log_enriched + hdfs_audit_event topic for kafka data sink diff --git a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml index 87d3202363..1142e1b89e 100644 --- a/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml +++ b/eagle-topology-check/eagle-topology-app/src/main/resources/META-INF/providers/org.apache.eagle.topology.TopologyCheckAppProvider.xml @@ -133,7 +133,7 @@ dataSinkConfig.topic Topic For Kafka Data Sink - topology_health_check + topology_check topic For kafka data sink From 2444b1b301810123321898173a94371d3903292c Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Tue, 14 Feb 2017 17:38:36 +0800 Subject: [PATCH 03/10] add QueueMappingService --- .../queue/common/HadoopClusterConstants.java | 1 + .../crawler/SchedulerInfoParseListener.java | 20 ++++++-- .../model/HadoopQueueEntityRepository.java | 2 + .../model/scheduler/ParentQueueAPIEntity.java | 47 +++++++++++++++++++ .../storm/HadoopQueueMetricPersistBolt.java | 15 ++++-- .../service/jpm/RunningQueueResource.java | 46 ++++++++++++++++++ 6 files changed, 122 insertions(+), 9 deletions(-) create mode 100644 eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/ParentQueueAPIEntity.java create mode 100644 eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java index 9a08f05d75..1d64f8789e 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/common/HadoopClusterConstants.java @@ -80,6 +80,7 @@ public static class LeafQueueInfo { } public static final String RUNNING_QUEUE_SERVICE_NAME = "RunningQueueService"; + public static final String QUEUE_MAPPING_SERVICE_NAME = "QueueMappingService"; // tag constants public static final String TAG_PARENT_QUEUE = "parentQueue"; diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index b0452c9776..86c0478395 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -23,6 +23,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; import org.apache.eagle.hadoop.queue.model.scheduler.*; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericMetricEntity; import backtype.storm.spout.SpoutOutputCollector; @@ -43,6 +44,7 @@ public class SchedulerInfoParseListener { private final List runningQueueAPIEntities = new ArrayList<>(); private final List metricEntities = new ArrayList<>(); + private final Map queueMap = new HashMap<>(); private String site; private SpoutOutputCollector collector; @@ -56,8 +58,9 @@ public void onMetric(SchedulerInfo scheduler, long currentTimestamp) throws Exce Map tags = buildMetricTags(null, null); createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, currentTimestamp, scheduler.getCapacity()); createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, currentTimestamp, scheduler.getUsedCapacity()); + queueMap.clear(); for (Queue queue : scheduler.getQueues().getQueue()) { - createQueues(queue, currentTimestamp, scheduler, null); + createQueues(queue, currentTimestamp, scheduler, null, queueMap); } } @@ -69,11 +72,18 @@ public void flush() { LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size()); messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); - List entities = new ArrayList<>(runningQueueAPIEntities); + List entities = new ArrayList<>(runningQueueAPIEntities); + + ParentQueueAPIEntity parentQueueAPIEntity = new ParentQueueAPIEntity(); + parentQueueAPIEntity.setQueueMap(new HashMap<>(queueMap)); + parentQueueAPIEntity.setTags(buildMetricTags(null, null)); + entities.add(parentQueueAPIEntity); + collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId); runningQueueAPIEntities.clear(); metricEntities.clear(); + queueMap.clear(); } private Map buildMetricTags(String queueName, String parentQueueName) { @@ -97,7 +107,9 @@ private void createMetric(String metricName, Map tags, long time this.metricEntities.add(e); } - private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception { + private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName, + Map queueMap) throws Exception { + queueMap.put(queue.getQueueName(), parentQueueName); RunningQueueAPIEntity _entity = new RunningQueueAPIEntity(); Map _tags = buildMetricTags(queue.getQueueName(), parentQueueName); _entity.setTags(_tags); @@ -147,7 +159,7 @@ private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo sche if (queue.getQueues() != null && queue.getQueues().getQueue() != null) { for (Queue subQueue : queue.getQueues().getQueue()) { - createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName()); + createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName(), queueMap); } } } diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java index f598779522..aaeb3a32f0 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java @@ -17,11 +17,13 @@ */ package org.apache.eagle.hadoop.queue.model; +import org.apache.eagle.hadoop.queue.model.scheduler.ParentQueueAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.entity.repo.EntityRepository; public class HadoopQueueEntityRepository extends EntityRepository { public HadoopQueueEntityRepository() { this.registerEntity(RunningQueueAPIEntity.class); + this.registerEntity(ParentQueueAPIEntity.class); } } \ No newline at end of file diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/ParentQueueAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/ParentQueueAPIEntity.java new file mode 100644 index 0000000000..65a376ad4c --- /dev/null +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/ParentQueueAPIEntity.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.hadoop.queue.model.scheduler; + +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; +import org.apache.eagle.log.entity.meta.*; + +import java.util.Map; + +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +@Table("queue_map") +@ColumnFamily("f") +@Prefix("queueMap") +@Service(HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME) +@TimeSeries(false) +@Partition( {"site"}) +public class ParentQueueAPIEntity extends TaggedLogAPIEntity { + @Column("a") + Map queueMap; + + public Map getQueueMap() { + return queueMap; + } + + public void setQueueMap(Map queueMap) { + this.queueMap = queueMap; + valueChanged("queueMap"); + } + +} diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java index 1bafc13d11..9eb7008748 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/storm/HadoopQueueMetricPersistBolt.java @@ -30,6 +30,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.LeafQueueInfo; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.UserWrapper; +import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericMetricEntity; import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; import org.apache.eagle.service.client.IEagleServiceClient; @@ -72,10 +73,14 @@ public void execute(Tuple input) { List metrics = (List) data; writeMetrics(metrics); } else if (dataType.equalsIgnoreCase(HadoopClusterConstants.DataType.ENTITY.toString())) { - List entities = (List) data; - for (RunningQueueAPIEntity queue : entities) { - if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { - collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), parseLeafQueueInfo(queue))); + List entities = (List) data; + for (TaggedLogAPIEntity entity : entities) { + if (entity instanceof RunningQueueAPIEntity) { + RunningQueueAPIEntity queue = (RunningQueueAPIEntity) entity; + if (queue.getUsers() != null && !queue.getUsers().getUsers().isEmpty() && queue.getMemory() != 0) { + collector.emit(new Values(queue.getTags().get(HadoopClusterConstants.TAG_QUEUE), + parseLeafQueueInfo(queue))); + } } } writeEntities(entities); @@ -99,7 +104,7 @@ public void cleanup() { } } - private void writeEntities(List entities) { + private void writeEntities(List entities) { try { GenericServiceAPIResponseEntity response = client.create(entities); if (!response.isSuccess()) { diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java new file mode 100644 index 0000000000..0a2f15b9ac --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import javax.ws.rs.Consumes; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import java.util.Map; + +@Path("queue") +public class RunningQueueResource { + + @Path("/{queueName}/users") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Map getUsersByQueue(@QueryParam("top") int top, @QueryParam("currentTime") long currentTime) { + return null; + } + + @Path("/{queueName}/jobs") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Map getJobsByQueue(@QueryParam("top") int top, @QueryParam("currentTime") long currentTime) { + return null; + } + + + +} From b7537fb00d48e0f217c80de14e655642bd490c8f Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Wed, 15 Feb 2017 18:33:16 +0800 Subject: [PATCH 04/10] add RunningQueueResource --- .../crawler/SchedulerInfoParseListener.java | 34 ++--- .../model/HadoopQueueEntityRepository.java | 4 +- ...tity.java => QueueStructureAPIEntity.java} | 27 ++-- eagle-jpm/eagle-jpm-service/pom.xml | 5 + .../service/jpm/RunningQueueResource.java | 134 ++++++++++++++++-- .../service/jpm/RunningQueueResponse.java | 50 +++++++ 6 files changed, 215 insertions(+), 39 deletions(-) rename eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/{ParentQueueAPIEntity.java => QueueStructureAPIEntity.java} (69%) create mode 100644 eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index 86c0478395..2a2f3205db 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -22,6 +22,7 @@ import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants; import org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.MetricName; import org.apache.eagle.hadoop.queue.model.scheduler.*; +import org.apache.eagle.hadoop.queue.model.scheduler.Queue; import org.apache.eagle.hadoop.queue.storm.HadoopQueueMessageId; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.GenericMetricEntity; @@ -42,9 +43,8 @@ public class SchedulerInfoParseListener { //private final static long AGGREGATE_INTERVAL = DateTimeUtil.ONEMINUTE; //private int MAX_CACHE_COUNT = 1000; - private final List runningQueueAPIEntities = new ArrayList<>(); + private final List runningQueueAPIEntities = new ArrayList<>(); private final List metricEntities = new ArrayList<>(); - private final Map queueMap = new HashMap<>(); private String site; private SpoutOutputCollector collector; @@ -58,9 +58,9 @@ public void onMetric(SchedulerInfo scheduler, long currentTimestamp) throws Exce Map tags = buildMetricTags(null, null); createMetric(MetricName.HADOOP_CLUSTER_CAPACITY, tags, currentTimestamp, scheduler.getCapacity()); createMetric(MetricName.HADOOP_CLUSTER_USED_CAPACITY, tags, currentTimestamp, scheduler.getUsedCapacity()); - queueMap.clear(); + for (Queue queue : scheduler.getQueues().getQueue()) { - createQueues(queue, currentTimestamp, scheduler, null, queueMap); + createQueues(queue, currentTimestamp, scheduler, null); } } @@ -73,17 +73,10 @@ public void flush() { LOG.info("Flushing {} RunningQueueEntities in memory", runningQueueAPIEntities.size()); messageId = new HadoopQueueMessageId(HadoopClusterConstants.DataType.ENTITY, HadoopClusterConstants.DataSource.SCHEDULER, System.currentTimeMillis()); List entities = new ArrayList<>(runningQueueAPIEntities); - - ParentQueueAPIEntity parentQueueAPIEntity = new ParentQueueAPIEntity(); - parentQueueAPIEntity.setQueueMap(new HashMap<>(queueMap)); - parentQueueAPIEntity.setTags(buildMetricTags(null, null)); - entities.add(parentQueueAPIEntity); - collector.emit(new ValuesArray(HadoopClusterConstants.DataType.ENTITY.name(), entities), messageId); runningQueueAPIEntities.clear(); metricEntities.clear(); - queueMap.clear(); } private Map buildMetricTags(String queueName, String parentQueueName) { @@ -107,9 +100,7 @@ private void createMetric(String metricName, Map tags, long time this.metricEntities.add(e); } - private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName, - Map queueMap) throws Exception { - queueMap.put(queue.getQueueName(), parentQueueName); + private List createQueues(Queue queue, long currentTimestamp, SchedulerInfo scheduler, String parentQueueName) throws Exception { RunningQueueAPIEntity _entity = new RunningQueueAPIEntity(); Map _tags = buildMetricTags(queue.getQueueName(), parentQueueName); _entity.setTags(_tags); @@ -135,7 +126,6 @@ private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo sche UserWrappers users = new UserWrappers(); users.setUsers(userList); _entity.setUsers(users); - runningQueueAPIEntities.add(_entity); createMetric(MetricName.HADOOP_QUEUE_NUMPENDING_JOBS, _tags, currentTimestamp, queue.getNumPendingApplications()); @@ -157,11 +147,23 @@ private void createQueues(Queue queue, long currentTimestamp, SchedulerInfo sche } } + List subQueues = new ArrayList<>(); + List allSubQueues = new ArrayList<>(); if (queue.getQueues() != null && queue.getQueues().getQueue() != null) { for (Queue subQueue : queue.getQueues().getQueue()) { - createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName(), queueMap); + subQueues.add(subQueue.getQueueName()); + allSubQueues.add(subQueue.getQueueName()); + List queues = createQueues(subQueue, currentTimestamp, scheduler, queue.getQueueName()); + allSubQueues.addAll(queues); } } + QueueStructureAPIEntity queueStructureAPIEntity = new QueueStructureAPIEntity(); + queueStructureAPIEntity.setTags(_tags); + queueStructureAPIEntity.setSubQueues(subQueues); + queueStructureAPIEntity.setAllSubQueues(allSubQueues); + runningQueueAPIEntities.add(queueStructureAPIEntity); + + return allSubQueues; } private UserWrapper wrapUser(User user) { diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java index aaeb3a32f0..40d6e53a07 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/HadoopQueueEntityRepository.java @@ -17,13 +17,13 @@ */ package org.apache.eagle.hadoop.queue.model; -import org.apache.eagle.hadoop.queue.model.scheduler.ParentQueueAPIEntity; +import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity; import org.apache.eagle.hadoop.queue.model.scheduler.RunningQueueAPIEntity; import org.apache.eagle.log.entity.repo.EntityRepository; public class HadoopQueueEntityRepository extends EntityRepository { public HadoopQueueEntityRepository() { this.registerEntity(RunningQueueAPIEntity.class); - this.registerEntity(ParentQueueAPIEntity.class); + this.registerEntity(QueueStructureAPIEntity.class); } } \ No newline at end of file diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/ParentQueueAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java similarity index 69% rename from eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/ParentQueueAPIEntity.java rename to eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java index 65a376ad4c..5031f7b81c 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/ParentQueueAPIEntity.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java @@ -22,7 +22,7 @@ import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.meta.*; -import java.util.Map; +import java.util.List; @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) @Table("queue_map") @@ -31,17 +31,28 @@ @Service(HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME) @TimeSeries(false) @Partition( {"site"}) -public class ParentQueueAPIEntity extends TaggedLogAPIEntity { +public class QueueStructureAPIEntity extends TaggedLogAPIEntity { @Column("a") - Map queueMap; + private List subQueues; + @Column("b") + private List allSubQueues; - public Map getQueueMap() { - return queueMap; + public List getSubQueues() { + return subQueues; } - public void setQueueMap(Map queueMap) { - this.queueMap = queueMap; - valueChanged("queueMap"); + public void setSubQueues(List subQueues) { + this.subQueues = subQueues; + valueChanged("subQueues"); + } + + public List getAllSubQueues() { + return allSubQueues; + } + + public void setAllSubQueues(List allSubQueues) { + this.allSubQueues = allSubQueues; + valueChanged("allSubQueues"); } } diff --git a/eagle-jpm/eagle-jpm-service/pom.xml b/eagle-jpm/eagle-jpm-service/pom.xml index d6807bd9e3..197740ee84 100644 --- a/eagle-jpm/eagle-jpm-service/pom.xml +++ b/eagle-jpm/eagle-jpm-service/pom.xml @@ -43,5 +43,10 @@ eagle-jpm-entity ${project.version} + + org.apache.eagle + eagle-hadoop-queue + ${project.version} + \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java index 0a2f15b9ac..ef9a7d0c83 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -17,30 +17,138 @@ package org.apache.eagle.service.jpm; -import javax.ws.rs.Consumes; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; +import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.common.utils.Tuple2; +import org.apache.eagle.hadoop.queue.model.scheduler.QueueStructureAPIEntity; +import org.apache.eagle.jpm.mr.runningentity.JobExecutionAPIEntity; +import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity; +import org.apache.eagle.service.generic.GenericEntityServiceResource; + +import javax.ws.rs.*; import javax.ws.rs.core.MediaType; -import java.util.Map; +import java.io.IOException; +import java.text.ParseException; +import java.util.*; + +import static org.apache.eagle.hadoop.queue.common.HadoopClusterConstants.QUEUE_MAPPING_SERVICE_NAME; +import static org.apache.eagle.jpm.util.Constants.JPA_JOB_EXECUTION_SERVICE_NAME; +import static org.apache.eagle.jpm.util.Constants.JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME; +import static org.apache.eagle.jpm.util.MRJobTagName.JOB_ID; +import static org.apache.eagle.jpm.util.MRJobTagName.JOB_QUEUE; +import static org.apache.eagle.jpm.util.MRJobTagName.USER; @Path("queue") public class RunningQueueResource { - @Path("/{queueName}/users") + @GET + @Path("memory") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public Map getUsersByQueue(@QueryParam("top") int top, @QueryParam("currentTime") long currentTime) { - return null; + public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, + @QueryParam("queue") String queue, + @QueryParam("currentTime") long currentTime, + @QueryParam("top") int top) { + RunningQueueResponse result = new RunningQueueResponse(); + try { + if (site == null || queue == null || currentTime == 0L || top == 0) { + throw new Exception("Invalid query parameters: site == null || queue == null || currentTime == 0L || top == 0"); + } + Tuple2 queryTimeRange = getQueryTimeRange(currentTime); + Map> queueMap = getQueueMap(site); + List runningJobs = getRunningJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1()); + List jobs = getJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1()); + Set jobIds = new HashSet<>(); + jobs.forEach(job -> jobIds.add(job.getTags().get(JOB_ID.toString()))); + + TreeMap sortedJobUsage = new TreeMap<>(); + Map userUsage = new HashMap<>(); + for (JobExecutionAPIEntity job : runningJobs) { + String jobId = job.getTags().get(JOB_ID.toString()); + String jobQueue = job.getTags().get(JOB_QUEUE.toString()); + String user = job.getTags().get(USER.toString()); + + if (jobIds.contains(jobId) && queueMap.get(queue).contains(jobQueue)) { + if (userUsage.containsKey(user)) { + userUsage.put(user, userUsage.get(user) + job.getAllocatedMB()); + } else { + userUsage.put(user, 0L); + } + sortedJobUsage.put(job.getAllocatedMB(), jobId); + } + } + + TreeMap sortedUserUsage = new TreeMap<>(); + for (Map.Entry entry : userUsage.entrySet()) { + sortedUserUsage.put(entry.getValue(), entry.getKey()); + } + result.setJobs(getTopRecords(top, sortedJobUsage)); + result.setUsers(getTopRecords(top, sortedUserUsage)); + } catch (Exception e) { + result.setErrMessage(e.getMessage()); + } + return result; } - @Path("/{queueName}/jobs") - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public Map getJobsByQueue(@QueryParam("top") int top, @QueryParam("currentTime") long currentTime) { - return null; + private List getRunningJobs(String site, long currentTime, String startTime, String endTime) throws Exception { + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + String query = String.format("%s[@site=\"%s\" and @startTime<=%s and (@internalState=\"RUNNING\" or @endTime>%s)]{@jobId, @user, @queue, @allocatedMB}", JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime); + GenericServiceAPIResponseEntity runningJobResponse = resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); + + if (!runningJobResponse.isSuccess() || runningJobResponse.getObj() == null) { + throw new IOException(runningJobResponse.getException()); + } + + return runningJobResponse.getObj(); } + private List getJobs(String site, long currentTime, String startTime, String endTime) throws Exception{ + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + String query = String.format("%s[@site=\"%s\" and @startTime<=%s and @endTime>%s]{@jobId}", JPA_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime); + + GenericServiceAPIResponseEntity response = + resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); + if (!response.isSuccess() || response.getObj() == null) { + throw new IOException(response.getException()); + } + return response.getObj(); + } + + private Map> getQueueMap(String site) throws IOException { + GenericEntityServiceResource resource = new GenericEntityServiceResource(); + + String query = String.format("%s[@site=\"%s\"]{*}", QUEUE_MAPPING_SERVICE_NAME, site); + GenericServiceAPIResponseEntity responseEntity = resource.search(query, null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); + + if (!responseEntity.isSuccess() || responseEntity.getObj() == null) { + throw new IOException(responseEntity.getException()); + } + Map> result = new HashMap<>(); + for(QueueStructureAPIEntity entity : responseEntity.getObj()) { + String queue = entity.getTags().get("queue"); + Set subQueues = new HashSet<>(); + subQueues.addAll(entity.getAllSubQueues()); + result.put(queue, subQueues); + } + return result; + } + + private Tuple2 getQueryTimeRange(long currentTime) throws ParseException { + String startTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime - DateTimeUtil.ONEHOUR * 12); + String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(currentTime + DateTimeUtil.ONEMINUTE); + return new Tuple2<>(startTime, endTime); + } + + private Map getTopRecords(int top, TreeMap map) { + Map newMap = new LinkedHashMap<>(); + for (Map.Entry entry : map.entrySet()) { + if (newMap.size() < top) { + newMap.put(entry.getValue(), entry.getKey()); + } else { + break; + } + } + return newMap; + } } diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java new file mode 100644 index 0000000000..1281b667db --- /dev/null +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResponse.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.service.jpm; + +import java.util.Map; + +public class RunningQueueResponse { + private String errMessage; + private Map jobs; + private Map users; + + public String getErrMessage() { + return errMessage; + } + + public void setErrMessage(String errMessage) { + this.errMessage = errMessage; + } + + public Map getJobs() { + return jobs; + } + + public void setJobs(Map jobs) { + this.jobs = jobs; + } + + public Map getUsers() { + return users; + } + + public void setUsers(Map users) { + this.users = users; + } +} From 35cd040d05d442de092cfbcf993a30a8196e85ef Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Wed, 15 Feb 2017 19:41:57 +0800 Subject: [PATCH 05/10] fix a bug --- .../service/jpm/RunningQueueResource.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java index ef9a7d0c83..0e311ec755 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -60,29 +60,25 @@ public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, Set jobIds = new HashSet<>(); jobs.forEach(job -> jobIds.add(job.getTags().get(JOB_ID.toString()))); - TreeMap sortedJobUsage = new TreeMap<>(); Map userUsage = new HashMap<>(); + Map jobUsage = new HashMap<>(); for (JobExecutionAPIEntity job : runningJobs) { String jobId = job.getTags().get(JOB_ID.toString()); String jobQueue = job.getTags().get(JOB_QUEUE.toString()); String user = job.getTags().get(USER.toString()); - if (jobIds.contains(jobId) && queueMap.get(queue).contains(jobQueue)) { + if (jobIds.contains(jobId) && queueMap.containsKey(queue) + && (queueMap.containsKey(jobQueue) || queueMap.get(queue).contains(jobQueue))) { if (userUsage.containsKey(user)) { userUsage.put(user, userUsage.get(user) + job.getAllocatedMB()); } else { userUsage.put(user, 0L); } - sortedJobUsage.put(job.getAllocatedMB(), jobId); + jobUsage.put(jobId, job.getAllocatedMB()); } } - - TreeMap sortedUserUsage = new TreeMap<>(); - for (Map.Entry entry : userUsage.entrySet()) { - sortedUserUsage.put(entry.getValue(), entry.getKey()); - } - result.setJobs(getTopRecords(top, sortedJobUsage)); - result.setUsers(getTopRecords(top, sortedUserUsage)); + result.setJobs(getTopRecords(top, jobUsage)); + result.setUsers(getTopRecords(top, userUsage)); } catch (Exception e) { result.setErrMessage(e.getMessage()); } @@ -140,11 +136,14 @@ private Tuple2 getQueryTimeRange(long currentTime) throws ParseE return new Tuple2<>(startTime, endTime); } - private Map getTopRecords(int top, TreeMap map) { + private Map getTopRecords(int top, Map map) { Map newMap = new LinkedHashMap<>(); - for (Map.Entry entry : map.entrySet()) { + + List> list = new ArrayList<>(map.entrySet()); + Collections.sort(list, (o1, o2) -> o1.getValue() >= o2.getValue() ? 1 : -1); + for (Map.Entry entry : list) { if (newMap.size() < top) { - newMap.put(entry.getValue(), entry.getKey()); + newMap.put(entry.getKey(), entry.getValue()); } else { break; } From 70141550df0d2035945c9e2b475d6da8fc794886 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Wed, 15 Feb 2017 19:52:50 +0800 Subject: [PATCH 06/10] fix bugs --- .../java/org/apache/eagle/service/jpm/RunningQueueResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java index 0e311ec755..acfa665ca3 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -140,7 +140,7 @@ private Map getTopRecords(int top, Map map) { Map newMap = new LinkedHashMap<>(); List> list = new ArrayList<>(map.entrySet()); - Collections.sort(list, (o1, o2) -> o1.getValue() >= o2.getValue() ? 1 : -1); + Collections.sort(list, (o1, o2) -> o1.getValue() < o2.getValue() ? 1 : -1); for (Map.Entry entry : list) { if (newMap.size() < top) { newMap.put(entry.getKey(), entry.getValue()); From 03a3000130c24a21293dc70c7958de9fc3dec450 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 16 Feb 2017 09:50:03 +0800 Subject: [PATCH 07/10] refine conde --- .../service/jpm/RunningQueueResource.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java index acfa665ca3..3451e98e7a 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -54,7 +54,7 @@ public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, throw new Exception("Invalid query parameters: site == null || queue == null || currentTime == 0L || top == 0"); } Tuple2 queryTimeRange = getQueryTimeRange(currentTime); - Map> queueMap = getQueueMap(site); + Set queueSet = getSubQueueSet(site, queue); List runningJobs = getRunningJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1()); List jobs = getJobs(site, currentTime, queryTimeRange.f0(), queryTimeRange.f1()); Set jobIds = new HashSet<>(); @@ -67,8 +67,7 @@ public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, String jobQueue = job.getTags().get(JOB_QUEUE.toString()); String user = job.getTags().get(USER.toString()); - if (jobIds.contains(jobId) && queueMap.containsKey(queue) - && (queueMap.containsKey(jobQueue) || queueMap.get(queue).contains(jobQueue))) { + if (jobIds.contains(jobId) && queueSet.contains(jobQueue)) { if (userUsage.containsKey(user)) { userUsage.put(user, userUsage.get(user) + job.getAllocatedMB()); } else { @@ -111,23 +110,21 @@ private List getJob return response.getObj(); } - private Map> getQueueMap(String site) throws IOException { + private Set getSubQueueSet(String site, String parentQueue) throws IOException { GenericEntityServiceResource resource = new GenericEntityServiceResource(); - String query = String.format("%s[@site=\"%s\"]{*}", QUEUE_MAPPING_SERVICE_NAME, site); + String query = String.format("%s[@site=\"%s\" and @queue=\"%s\"]{*}", QUEUE_MAPPING_SERVICE_NAME, site, parentQueue); GenericServiceAPIResponseEntity responseEntity = resource.search(query, null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); if (!responseEntity.isSuccess() || responseEntity.getObj() == null) { throw new IOException(responseEntity.getException()); } - Map> result = new HashMap<>(); - for(QueueStructureAPIEntity entity : responseEntity.getObj()) { - String queue = entity.getTags().get("queue"); - Set subQueues = new HashSet<>(); - subQueues.addAll(entity.getAllSubQueues()); - result.put(queue, subQueues); - } - return result; + + Set subQueues = new HashSet<>(); + subQueues.add(parentQueue); + subQueues.addAll(responseEntity.getObj().get(0).getAllSubQueues()); + + return subQueues; } private Tuple2 getQueryTimeRange(long currentTime) throws ParseException { From f07c392f2c2f63d93ff80c7309051aec5aea3790 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 16 Feb 2017 10:04:40 +0800 Subject: [PATCH 08/10] fix a bug --- .../java/org/apache/eagle/service/jpm/RunningQueueResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java index 3451e98e7a..dc87f2d405 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -71,7 +71,7 @@ public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, if (userUsage.containsKey(user)) { userUsage.put(user, userUsage.get(user) + job.getAllocatedMB()); } else { - userUsage.put(user, 0L); + userUsage.put(user, job.getAllocatedMB()); } jobUsage.put(jobId, job.getAllocatedMB()); } From cc73d6eed3f87abe0a22907f1811b5635277e672 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 16 Feb 2017 10:19:21 +0800 Subject: [PATCH 09/10] add LastUpdateTime in QueueStructureAPIEntity --- .../queue/crawler/SchedulerInfoParseListener.java | 1 + .../model/scheduler/QueueStructureAPIEntity.java | 11 +++++++++++ 2 files changed, 12 insertions(+) diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java index 2a2f3205db..67cc5c9174 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/crawler/SchedulerInfoParseListener.java @@ -161,6 +161,7 @@ private List createQueues(Queue queue, long currentTimestamp, SchedulerI queueStructureAPIEntity.setTags(_tags); queueStructureAPIEntity.setSubQueues(subQueues); queueStructureAPIEntity.setAllSubQueues(allSubQueues); + queueStructureAPIEntity.setLastUpdateTime(currentTimestamp); runningQueueAPIEntities.add(queueStructureAPIEntity); return allSubQueues; diff --git a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java index 5031f7b81c..72f67bce78 100644 --- a/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java +++ b/eagle-jpm/eagle-hadoop-queue/src/main/java/org/apache/eagle/hadoop/queue/model/scheduler/QueueStructureAPIEntity.java @@ -36,6 +36,8 @@ public class QueueStructureAPIEntity extends TaggedLogAPIEntity { private List subQueues; @Column("b") private List allSubQueues; + @Column("c") + private long lastUpdateTime; public List getSubQueues() { return subQueues; @@ -55,4 +57,13 @@ public void setAllSubQueues(List allSubQueues) { valueChanged("allSubQueues"); } + public long getLastUpdateTime() { + return lastUpdateTime; + } + + public void setLastUpdateTime(long lastUpdateTime) { + this.lastUpdateTime = lastUpdateTime; + valueChanged("lastUpdateTime"); + } + } From 3d85be86249ac9043926d5ac6dd44fc4622655fb Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 16 Feb 2017 11:11:47 +0800 Subject: [PATCH 10/10] fix code style bugs --- .../apache/eagle/service/jpm/RunningQueueResource.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java index dc87f2d405..2632423d1c 100644 --- a/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java +++ b/eagle-jpm/eagle-jpm-service/src/main/java/org/apache/eagle/service/jpm/RunningQueueResource.java @@ -78,7 +78,7 @@ public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, } result.setJobs(getTopRecords(top, jobUsage)); result.setUsers(getTopRecords(top, userUsage)); - } catch (Exception e) { + } catch (Exception e) { result.setErrMessage(e.getMessage()); } return result; @@ -86,7 +86,8 @@ public RunningQueueResponse getTopByQueue(@QueryParam("site") String site, private List getRunningJobs(String site, long currentTime, String startTime, String endTime) throws Exception { GenericEntityServiceResource resource = new GenericEntityServiceResource(); - String query = String.format("%s[@site=\"%s\" and @startTime<=%s and (@internalState=\"RUNNING\" or @endTime>%s)]{@jobId, @user, @queue, @allocatedMB}", JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime); + String query = String.format("%s[@site=\"%s\" and @startTime<=%s and (@internalState=\"RUNNING\" or @endTime>%s)]{@jobId, @user, @queue, @allocatedMB}", + JPA_RUNNING_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime); GenericServiceAPIResponseEntity runningJobResponse = resource.search(query, startTime, endTime, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false); if (!runningJobResponse.isSuccess() || runningJobResponse.getObj() == null) { @@ -96,7 +97,10 @@ private List getRunningJobs(String site, long currentTime return runningJobResponse.getObj(); } - private List getJobs(String site, long currentTime, String startTime, String endTime) throws Exception{ + private List getJobs(String site, + long currentTime, + String startTime, + String endTime) throws Exception { GenericEntityServiceResource resource = new GenericEntityServiceResource(); String query = String.format("%s[@site=\"%s\" and @startTime<=%s and @endTime>%s]{@jobId}", JPA_JOB_EXECUTION_SERVICE_NAME, site, currentTime, currentTime);