From 82a0db78e9689ca99e0f1e8ec276f2fbd4615a30 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Fri, 3 Mar 2017 10:34:57 +0800 Subject: [PATCH 01/11] add jdbc storage support for sla job meta --- eagle-jpm/eagle-jpm-analyzer/pom.xml | 5 + .../analyzer/meta/MetaManagementService.java | 16 +- .../impl/MetaManagementServiceJDBCImpl.java | 180 ++++++++++++++++-- .../impl/MetaManagementServiceMemoryImpl.java | 79 ++++---- .../impl/orm/JobMetaEntityToRelation.java | 62 ++++++ .../impl/orm/RelationToJobMetaEntity.java | 76 ++++++++ .../impl/orm/RelationToUserEmailEntity.java | 37 ++++ .../impl/orm/UserEmailEntityToRelation.java | 57 ++++++ .../analyzer/meta/model/AnalyzerEntity.java | 6 +- .../analyzer/meta/model/JobMetaEntity.java | 16 +- ...lisherEntity.java => UserEmailEntity.java} | 26 ++- .../analyzer/mr/MRJobPerformanceAnalyzer.java | 2 +- .../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 10 +- .../UnExpectedLongDurationJobProcessor.java | 2 +- .../analyzer/publisher/EmailPublisher.java | 18 +- .../dedup/impl/SimpleDeduplicator.java | 4 +- .../analyzer/resource/AnalyzerResource.java | 87 ++++++--- .../eagle/jpm/analyzer/util/Constants.java | 16 +- .../apache/eagle/jpm/analyzer/util/Utils.java | 34 +++- .../src/main/resources/createTable.sql | 26 ++- .../MRHistoryJobApplicationProvider.java | 13 +- 21 files changed, 632 insertions(+), 140 deletions(-) create mode 100644 eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java create mode 100644 eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java create mode 100644 eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java create mode 100644 eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java rename eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/{PublisherEntity.java => UserEmailEntity.java} (71%) diff --git a/eagle-jpm/eagle-jpm-analyzer/pom.xml b/eagle-jpm/eagle-jpm-analyzer/pom.xml index 07f5766ced..a2943dfa7c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/pom.xml +++ b/eagle-jpm/eagle-jpm-analyzer/pom.xml @@ -55,5 +55,10 @@ eagle-app-base ${project.version} + + org.apache.eagle + eagle-metadata-jdbc + ${project.version} + \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java index 09352661e9..73b7b8178a 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/MetaManagementService.java @@ -18,22 +18,24 @@ package org.apache.eagle.jpm.analyzer.meta; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import java.util.List; public interface MetaManagementService { boolean addJobMeta(JobMetaEntity jobMetaEntity); - boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity); + boolean updateJobMeta(JobMetaEntity jobMetaEntity); - List getJobMeta(String jobDefId); + List getJobMeta(String siteId, String jobDefId); - boolean deleteJobMeta(String jobDefId); + boolean deleteJobMeta(String siteId, String jobDefId); - boolean addPublisherMeta(PublisherEntity publisherEntity); + boolean addUserEmailMeta(UserEmailEntity userEmailEntity); - boolean deletePublisherMeta(String userId); + boolean updateUserEmailMeta(UserEmailEntity userEmailEntity); - List getPublisherMeta(String userId); + boolean deleteUserEmailMeta(String siteId, String userId); + + List getUserEmailMeta(String siteId, String userId); } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java index cfb50294e1..2048e97773 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceJDBCImpl.java @@ -17,61 +17,215 @@ package org.apache.eagle.jpm.analyzer.meta.impl; -import com.google.inject.Inject; import com.typesafe.config.Config; +import org.apache.commons.lang.StringUtils; import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.JobMetaEntityToRelation; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.RelationToJobMetaEntity; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.RelationToUserEmailEntity; +import org.apache.eagle.jpm.analyzer.meta.impl.orm.UserEmailEntityToRelation; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; +import org.apache.eagle.metadata.store.jdbc.JDBCMetadataQueryService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.io.Serializable; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; public class MetaManagementServiceJDBCImpl implements MetaManagementService, Serializable { private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceJDBCImpl.class); + private static final String addJobMetaSql = "INSERT INTO analysis_jobs(uuid, configuration, evaluators, createdtime, modifiedtime, siteId, jobDefId) VALUES (?, ?, ?, ?, ?, ?, ?)"; + private static final String addUserEmailSql = "INSERT INTO analysis_email(uuid, mailAddress, createdtime, modifiedtime, siteId, userId) VALUES (?, ?, ?, ?, ?, ?)"; + + private static final String getJobMetaSql = "SELECT * FROM analysis_jobs where siteId = ? and jobDefId = ?"; + private static final String getUserEmailSql = "SELECT * FROM analysis_email where siteId = ? and userId = ?"; + + private static final String deleteJobMetaSql = "DELETE FROM analysis_jobs where siteId = ? and jobDefId = ?"; + private static final String deleteUserEmailSql = "DELETE FROM analysis_email where siteId = ? and userId = ?"; + @Inject Config config; + @Inject + JDBCMetadataQueryService queryService; + @Override public boolean addJobMeta(JobMetaEntity jobMetaEntity) { - + if (getJobMeta(jobMetaEntity.getSiteId(), jobMetaEntity.getJobDefId()) != null) { + throw new IllegalArgumentException("Duplicated job meta: " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId()); + } + + List entities = new ArrayList<>(1); + entities.add(jobMetaEntity); + try { + queryService.insert(addJobMetaSql, entities, new JobMetaEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to insert JobMetaEntity: {}", jobMetaEntity, e); + return false; + } return true; } @Override - public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) { - + public boolean updateJobMeta(JobMetaEntity entity) { + String updateSql = "update analysis_jobs set "; + if (entity.getUuid() != null && !entity.getUuid().isEmpty()) { + updateSql += "uuid = ?, "; + } + if (entity.getConfiguration() != null) { + updateSql += "configuration = ?, "; + } + if (entity.getEvaluators() != null) { + updateSql += "evaluators = ?, "; + } + if (entity.getCreatedTime() > 0) { + updateSql += "createdtime = ?, "; + } + if (entity.getModifiedTime() > 0) { + updateSql += "modifiedtime = ?, "; + } + updateSql = updateSql.substring(0, updateSql.length() - 2); + if (StringUtils.isNotBlank(entity.getSiteId())) { + updateSql += " where siteId = ?"; + } + if (StringUtils.isNotBlank(entity.getJobDefId())) { + updateSql += " and jobDefId = ?"; + } + + try { + if (queryService.update(updateSql, entity, new JobMetaEntityToRelation()) == 0) { + LOG.warn("failed to execute {}", updateSql); + } + } catch (SQLException e) { + LOG.warn("failed to execute {}, {}", updateSql, e); + return false; + } return true; } @Override - public List getJobMeta(String jobDefId) { - - return null; + public List getJobMeta(String siteId, String jobDefId) { + JobMetaEntity jobMetaEntity = new JobMetaEntity(); + jobMetaEntity.setSiteId(siteId); + jobMetaEntity.setJobDefId(jobDefId); + + List results; + try { + results = queryService.queryWithCond(getJobMetaSql, jobMetaEntity, new JobMetaEntityToRelation(), new RelationToJobMetaEntity()); + } catch (SQLException e) { + LOG.error("Error to getJobMeta : {}", e); + return null; + } + if (results.isEmpty()) { + return null; + } + + return results; } @Override - public boolean deleteJobMeta(String jobDefId) { + public boolean deleteJobMeta(String siteId, String jobDefId) { + JobMetaEntity entity = new JobMetaEntity(); + entity.setSiteId(siteId); + entity.setJobDefId(jobDefId); + try { + queryService.update(deleteJobMetaSql, entity, new JobMetaEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to delete JobMetaEntity: {}", entity, e); + return false; + } return true; } @Override - public boolean addPublisherMeta(PublisherEntity publisherEntity) { + public boolean addUserEmailMeta(UserEmailEntity userEmailEntity) { + if (getUserEmailMeta(userEmailEntity.getSiteId(), userEmailEntity.getUserId()) != null) { + throw new IllegalArgumentException("Duplicated user meta: " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId()); + } + + List entities = new ArrayList<>(1); + entities.add(userEmailEntity); + try { + queryService.insert(addUserEmailSql, entities, new UserEmailEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to insert UserEmailEntity: {}", userEmailEntity, e); + return false; + } + return true; + } + @Override + public boolean updateUserEmailMeta(UserEmailEntity entity) { + String updateSql = "update analysis_email set "; + if (entity.getUuid() != null && !entity.getUuid().isEmpty()) { + updateSql += "uuid = ?, "; + } + if (entity.getMailAddress() != null && !entity.getMailAddress().isEmpty()) { + updateSql += "mailAddress = ?, "; + } + if (entity.getCreatedTime() > 0) { + updateSql += "createdtime = ?, "; + } + if (entity.getModifiedTime() > 0) { + updateSql += "modifiedtime = ?, "; + } + updateSql = updateSql.substring(0, updateSql.length() - 2); + if (StringUtils.isNotBlank(entity.getSiteId())) { + updateSql += " where siteId = ?"; + } + if (StringUtils.isNotBlank(entity.getUserId())) { + updateSql += " and userId = ?"; + } + + try { + if (queryService.update(updateSql, entity, new UserEmailEntityToRelation()) == 0) { + LOG.warn("failed to execute {}", updateSql); + } + } catch (SQLException e) { + LOG.warn("failed to execute {}, {}", updateSql, e); + return false; + } return true; } @Override - public boolean deletePublisherMeta(String userId) { + public boolean deleteUserEmailMeta(String siteId, String userId) { + UserEmailEntity entity = new UserEmailEntity(); + entity.setSiteId(siteId); + entity.setUserId(userId); + try { + queryService.update(deleteUserEmailSql, entity, new UserEmailEntityToRelation()); + } catch (SQLException e) { + LOG.error("Error to delete UserEmailEntity: {}", entity, e); + return false; + } return true; } @Override - public List getPublisherMeta(String userId) { - return null; + public List getUserEmailMeta(String siteId, String userId) { + UserEmailEntity userEmailEntity = new UserEmailEntity(); + userEmailEntity.setSiteId(siteId); + userEmailEntity.setUserId(userId); + + List results; + try { + results = queryService.queryWithCond(getUserEmailSql, userEmailEntity, new UserEmailEntityToRelation(), new RelationToUserEmailEntity()); + } catch (SQLException e) { + LOG.error("Error to getJobMeta : {}", e); + return null; + } + if (results.isEmpty()) { + return null; + } + + return results; } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java index 85e8358313..b7582c1178 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/MetaManagementServiceMemoryImpl.java @@ -17,94 +17,105 @@ package org.apache.eagle.jpm.analyzer.meta.impl; -import com.google.inject.Inject; import com.typesafe.config.Config; import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import java.io.Serializable; import java.util.*; public class MetaManagementServiceMemoryImpl implements MetaManagementService, Serializable { private static final Logger LOG = LoggerFactory.getLogger(MetaManagementServiceMemoryImpl.class); - private final Map jobMetaEntities = new HashMap<>(); - private final Map> publisherEntities = new HashMap<>(); + private final Map> jobMetaEntities = new HashMap<>(); + private final Map> publisherEntities = new HashMap<>(); @Inject Config config; @Override public boolean addJobMeta(JobMetaEntity jobMetaEntity) { - if (jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) { - LOG.warn("contains job {} already, add job meta failed", jobMetaEntity.getJobDefId()); - return false; + if (!jobMetaEntities.containsKey(jobMetaEntity.getSiteId())) { + jobMetaEntities.put(jobMetaEntity.getSiteId(), new HashMap<>()); } - jobMetaEntities.put(jobMetaEntity.getJobDefId(), jobMetaEntity); + jobMetaEntities.get(jobMetaEntity.getSiteId()).put(jobMetaEntity.getJobDefId(), jobMetaEntity); LOG.info("Successfully add job {} meta", jobMetaEntity.getJobDefId()); return true; } @Override - public boolean updateJobMeta(String jobDefId, JobMetaEntity jobMetaEntity) { - if (!jobMetaEntities.containsKey(jobMetaEntity.getJobDefId())) { - LOG.warn("does not contain job {}, update job meta failed", jobDefId); + public boolean updateJobMeta(JobMetaEntity jobMetaEntity) { + if (!jobMetaEntities.containsKey(jobMetaEntity.getSiteId())) { + LOG.warn("does not contain siteId {}, update job meta failed", jobMetaEntity.getSiteId()); return false; } - jobMetaEntities.put(jobDefId, jobMetaEntity); - LOG.info("Successfully update job {} meta", jobDefId); + jobMetaEntities.get(jobMetaEntity.getSiteId()).put(jobMetaEntity.getJobDefId(), jobMetaEntity); + LOG.info("Successfully update job {} meta", jobMetaEntity.getJobDefId()); return true; } @Override - public List getJobMeta(String jobDefId) { - if (!jobMetaEntities.containsKey(jobDefId)) { - LOG.warn("does not contain job {}, get job meta failed", jobDefId); + public List getJobMeta(String siteId, String jobDefId) { + if (!jobMetaEntities.containsKey(siteId)) { + LOG.warn("does not contain site {}, get job meta failed", siteId); return new ArrayList<>(); } - return Arrays.asList(jobMetaEntities.get(jobDefId)); + return Arrays.asList(jobMetaEntities.get(siteId).get(jobDefId)); } @Override - public boolean deleteJobMeta(String jobDefId) { - if (!jobMetaEntities.containsKey(jobDefId)) { - LOG.warn("does not contain job {}, delete job meta failed", jobDefId); + public boolean deleteJobMeta(String siteId, String jobDefId) { + if (!jobMetaEntities.containsKey(siteId)) { + LOG.warn("does not contain siteId {}, delete job meta failed", siteId); return false; } - jobMetaEntities.remove(jobDefId); + jobMetaEntities.get(siteId).remove(jobDefId); LOG.info("Successfully delete job {} meta", jobDefId); return true; } @Override - public boolean addPublisherMeta(PublisherEntity publisherEntity) { - if (publisherEntities.containsKey(publisherEntity.getUserId())) { - for (PublisherEntity entity : publisherEntities.get(publisherEntity.getUserId())) { - if (entity.equals(publisherEntity)) { + public boolean addUserEmailMeta(UserEmailEntity userEmailEntity) { + if (publisherEntities.containsKey(userEmailEntity.getSiteId())) { + for (UserEmailEntity entity : publisherEntities.get(userEmailEntity.getSiteId()).values()) { + if (entity.equals(userEmailEntity)) { LOG.warn("contains user {}, mailAddress {} already, add publisher failed", entity.getUserId(), entity.getMailAddress()); return false; } } } - if (!publisherEntities.containsKey(publisherEntity.getUserId())) { - publisherEntities.put(publisherEntity.getUserId(), new ArrayList<>()); + if (!publisherEntities.containsKey(userEmailEntity.getSiteId())) { + publisherEntities.put(userEmailEntity.getSiteId(), new HashMap<>()); } - publisherEntities.get(publisherEntity.getUserId()).add(publisherEntity); - LOG.info("Successfully add publisher user {}, mailAddress {}", publisherEntity.getUserId(), publisherEntity.getMailAddress()); + publisherEntities.get(userEmailEntity.getSiteId()).put(userEmailEntity.getUserId(), userEmailEntity); + LOG.info("Successfully add publisher user {}, mailAddress {}", userEmailEntity.getUserId(), userEmailEntity.getMailAddress()); return true; } @Override - public boolean deletePublisherMeta(String userId) { + public boolean updateUserEmailMeta(UserEmailEntity userEmailEntity) { + if (!publisherEntities.containsKey(userEmailEntity.getSiteId())) { + LOG.warn("does not contain siteId {}, update user email meta failed", userEmailEntity.getSiteId()); + return false; + } + + publisherEntities.get(userEmailEntity.getSiteId()).put(userEmailEntity.getUserId(), userEmailEntity); + LOG.info("Successfully update user {} meta", userEmailEntity.getUserId()); + return true; + } + + @Override + public boolean deleteUserEmailMeta(String siteId, String userId) { if (!publisherEntities.containsKey(userId)) { LOG.warn("does not contain user {}, failed to delete publisher", userId); return false; @@ -116,12 +127,12 @@ public boolean deletePublisherMeta(String userId) { } @Override - public List getPublisherMeta(String userId) { - if (!publisherEntities.containsKey(userId)) { - LOG.warn("does not contain user {}, failed to get publisher", userId); + public List getUserEmailMeta(String siteId, String userId) { + if (!publisherEntities.containsKey(siteId)) { + LOG.warn("does not contain siteId {}, failed to get publisher", siteId); return new ArrayList<>(); } - return publisherEntities.get(userId); + return Arrays.asList(publisherEntities.get(siteId).get(userId)); } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java new file mode 100644 index 0000000000..5053b50c71 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/JobMetaEntityToRelation.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + +import org.apache.commons.lang.StringUtils; +import org.apache.eagle.common.function.ThrowableConsumer2; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class JobMetaEntityToRelation implements ThrowableConsumer2 { + @Override + public void accept(PreparedStatement statement, JobMetaEntity entity) throws SQLException { + int parameterIndex = 1; + if (entity.getUuid() != null && StringUtils.isNotBlank(entity.getUuid())) { + statement.setString(parameterIndex, entity.getUuid()); + parameterIndex++; + } + if (entity.getConfiguration() != null) { + statement.setString(parameterIndex, JSONObject.toJSONString(entity.getConfiguration())); + parameterIndex++; + } + if (entity.getEvaluators() != null) { + statement.setString(parameterIndex, JSONArray.toJSONString(entity.getEvaluators())); + parameterIndex++; + } + if (entity.getCreatedTime() > 0) { + statement.setLong(parameterIndex, entity.getCreatedTime()); + parameterIndex++; + } + if (entity.getModifiedTime() > 0) { + statement.setLong(parameterIndex, entity.getModifiedTime()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getSiteId())) { + statement.setString(parameterIndex, entity.getSiteId()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getJobDefId())) { + statement.setString(parameterIndex, entity.getJobDefId()); + parameterIndex++; + } + } +} diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java new file mode 100644 index 0000000000..ec2a59bba4 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + +import org.apache.eagle.common.function.ThrowableFunction; +import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map; + + +public class RelationToJobMetaEntity implements ThrowableFunction { + private static final Logger LOG = LoggerFactory.getLogger(RelationToJobMetaEntity.class); + + @Override + public JobMetaEntity apply(ResultSet resultSet) throws SQLException { + JobMetaEntity jobMetaEntity = new JobMetaEntity(); + jobMetaEntity.setUuid(resultSet.getString(1)); + jobMetaEntity.setJobDefId(resultSet.getString(2)); + jobMetaEntity.setSiteId(resultSet.getString(3)); + jobMetaEntity.setConfiguration(parse(resultSet.getString(4))); + jobMetaEntity.setEvaluators(new ArrayList<>()); + try { + JSONArray jsonArray = new JSONArray(resultSet.getString(5)); + for (int i = 0; i < jsonArray.length(); ++i) { + jobMetaEntity.getEvaluators().add(jsonArray.getString(i)); + } + } catch (Exception e) { + LOG.warn("{}", e); + } + jobMetaEntity.setCreatedTime(resultSet.getLong(6)); + jobMetaEntity.setModifiedTime(resultSet.getLong(7)); + + return jobMetaEntity; + } + + private Map parse(String field) { + Map items = new java.util.HashMap<>(); + try { + JSONObject jsonObject = new JSONObject(field); + + Iterator keyItemItr = jsonObject.keys(); + while (keyItemItr.hasNext()) { + String itemKey = keyItemItr.next(); + items.put(itemKey, jsonObject.get(itemKey)); + } + + } catch (Exception e) { + LOG.warn("{}", e); + } + + return items; + } +} diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java new file mode 100644 index 0000000000..ec86506446 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToUserEmailEntity.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + +import org.apache.eagle.common.function.ThrowableFunction; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; + +import java.sql.ResultSet; +import java.sql.SQLException; + +public class RelationToUserEmailEntity implements ThrowableFunction { + @Override + public UserEmailEntity apply(ResultSet resultSet) throws SQLException { + UserEmailEntity userEmailEntity = new UserEmailEntity(); + userEmailEntity.setUuid(resultSet.getString(1)); + userEmailEntity.setUserId(resultSet.getString(2)); + userEmailEntity.setSiteId(resultSet.getString(3)); + userEmailEntity.setMailAddress(resultSet.getString(4)); + userEmailEntity.setCreatedTime(resultSet.getLong(5)); + userEmailEntity.setModifiedTime(resultSet.getLong(6)); + return userEmailEntity; + } +} diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java new file mode 100644 index 0000000000..29958b0c92 --- /dev/null +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/UserEmailEntityToRelation.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.jpm.analyzer.meta.impl.orm; + + +import org.apache.commons.lang.StringUtils; +import org.apache.eagle.common.function.ThrowableConsumer2; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; + +import java.sql.PreparedStatement; +import java.sql.SQLException; + +public class UserEmailEntityToRelation implements ThrowableConsumer2 { + @Override + public void accept(PreparedStatement statement, UserEmailEntity entity) throws SQLException { + int parameterIndex = 1; + if (StringUtils.isNotBlank(entity.getUuid())) { + statement.setString(parameterIndex, entity.getUuid()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getMailAddress())) { + statement.setString(parameterIndex, entity.getMailAddress()); + parameterIndex++; + } + if (entity.getCreatedTime() > 0) { + statement.setLong(parameterIndex, entity.getCreatedTime()); + parameterIndex++; + } + if (entity.getModifiedTime() > 0) { + statement.setLong(parameterIndex, entity.getModifiedTime()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getSiteId())) { + statement.setString(parameterIndex, entity.getSiteId()); + parameterIndex++; + } + if (StringUtils.isNotBlank(entity.getUserId())) { + statement.setString(parameterIndex, entity.getUserId()); + parameterIndex++; + } + } +} diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java index 189d85dba1..94971407de 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/AnalyzerEntity.java @@ -38,7 +38,7 @@ public class AnalyzerEntity { private Map jobConfig = new HashMap<>(); - private Map jobMeta = new HashMap<>(); + private JobMetaEntity jobMeta; public String getJobDefId() { return jobDefId; @@ -112,11 +112,11 @@ public void setJobConfig(Map jobConfig) { this.jobConfig = jobConfig; } - public Map getJobMeta() { + public JobMetaEntity getJobMeta() { return jobMeta; } - public void setJobMeta(Map jobMeta) { + public void setJobMeta(JobMetaEntity jobMeta) { this.jobMeta = jobMeta; } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java index 2e15c17484..90605ce4ac 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java @@ -17,20 +17,18 @@ package org.apache.eagle.jpm.analyzer.meta.model; +import org.apache.eagle.jpm.analyzer.util.Constants; import org.apache.eagle.metadata.persistence.PersistenceEntity; import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; +import java.util.*; @JsonIgnoreProperties(ignoreUnknown = true) public class JobMetaEntity extends PersistenceEntity { private String jobDefId; private String siteId; - private Map configuration = new HashMap<>(); - private Set evaluators = new HashSet<>(); + private Map configuration; + private List evaluators; public JobMetaEntity() { @@ -39,7 +37,7 @@ public JobMetaEntity() { public JobMetaEntity(String jobDefId, String siteId, Map configuration, - Set evaluators) { + List evaluators) { this.jobDefId = jobDefId; this.siteId = siteId; this.configuration = configuration; @@ -75,11 +73,11 @@ public void setConfiguration(Map configuration) { this.configuration = configuration; } - public Set getEvaluators() { + public List getEvaluators() { return evaluators; } - public void setEvaluators(Set evaluators) { + public void setEvaluators(List evaluators) { this.evaluators = evaluators; } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java similarity index 71% rename from eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java rename to eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java index bca7ab13b1..cbac4d0bed 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/PublisherEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/UserEmailEntity.java @@ -22,24 +22,37 @@ import org.codehaus.jackson.annotate.JsonIgnoreProperties; @JsonIgnoreProperties(ignoreUnknown = true) -public class PublisherEntity extends PersistenceEntity { +public class UserEmailEntity extends PersistenceEntity { private String userId; + private String siteId; private String mailAddress; - public PublisherEntity(String userId, String mailAddress) { + public UserEmailEntity() { + } + + public UserEmailEntity(String userId, String siteId, String mailAddress) { this.userId = userId; + this.siteId = siteId; this.mailAddress = mailAddress; } @Override public String toString() { - return String.format("PublisherEntity[userId=%s, mailAddress=%s]", userId, mailAddress); + return String.format("UserEmailEntity[userId=%s, siteId=%s, mailAddress=%s]", userId, siteId, mailAddress); } public String getUserId() { return userId; } + public void setSiteId(String siteId) { + this.siteId = siteId; + } + + public String getSiteId() { + return siteId; + } + public void setUserId(String userId) { this.userId = userId; } @@ -56,6 +69,7 @@ public void setMailAddress(String mailAddress) { public int hashCode() { return new HashCodeBuilder() .append(userId) + .append(siteId) .append(mailAddress) .build(); } @@ -66,12 +80,12 @@ public boolean equals(Object that) { return true; } - if (!(that instanceof PublisherEntity)) { + if (!(that instanceof UserEmailEntity)) { return false; } - PublisherEntity another = (PublisherEntity)that; + UserEmailEntity another = (UserEmailEntity)that; - return another.userId.equals(this.userId) && another.mailAddress.equals(this.mailAddress); + return another.userId.equals(this.userId) && another.siteId.equals(this.siteId) && another.mailAddress.equals(this.mailAddress); } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java index 57e1765d19..e32a37c0f4 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java @@ -48,7 +48,7 @@ public MRJobPerformanceAnalyzer(Config config) { evaluators.add(new JobSuggestionEvaluator(config)); publishers.add(new EagleStorePublisher(config)); - //publishers.add(new EmailPublisher(config)); + publishers.add(new EmailPublisher(config)); } @Override diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java index a77e55de2a..13bad50af8 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java @@ -48,20 +48,16 @@ public SLAJobEvaluator(Config config) { @Override public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) { - if (!analyzerJobEntity.getCurrentState().equalsIgnoreCase(Constants.JobState.RUNNING.toString())) { - return null; - } - Result.EvaluatorResult result = new Result.EvaluatorResult(); - List jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getJobDefId()); + List jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getJobDefId()); if (jobMetaEntities.size() == 0 - || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getName())) { + || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getSimpleName())) { LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId()); return result; } - analyzerJobEntity.setJobMeta(jobMetaEntities.get(0).getConfiguration()); + analyzerJobEntity.setJobMeta(jobMetaEntities.get(0)); for (Processor processor : processors) { result.addProcessorResult(processor.getClass(), processor.process(analyzerJobEntity)); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index f7748f84a4..116ac2c485 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -47,7 +47,7 @@ public UnExpectedLongDurationJobProcessor(Config config) { public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId()); - Map jobMetaData = analyzerJobEntity.getJobMeta(); + Map jobMetaData = analyzerJobEntity.getJobMeta().getConfiguration(); long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); if (avgDurationTime == 0L) { return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index 842e0ac35c..65a36637ef 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -18,17 +18,22 @@ package org.apache.eagle.jpm.analyzer.publisher; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import org.apache.eagle.app.service.ApplicationEmailService; import org.apache.eagle.common.DateTimeUtil; +import org.apache.eagle.common.mail.AlertEmailConstants; import org.apache.eagle.common.mail.AlertEmailContext; import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; import org.apache.eagle.jpm.analyzer.util.Constants; +import org.apache.eagle.jpm.analyzer.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,6 +50,7 @@ public EmailPublisher(Config config) { } @Override + //will refactor, just work now public void publish(AnalyzerEntity analyzerJobEntity, Result result) { if (result.getAlertMessages().size() == 0) { return; @@ -82,8 +88,16 @@ public void publish(AnalyzerEntity analyzerJobEntity, Result result) { alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic); alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend); - //TODO, override email config in job meta data - ApplicationEmailService emailService = new ApplicationEmailService(config, Constants.ANALYZER_REPORT_CONFIG_PATH); + Config cloneConfig = ConfigFactory.empty().withFallback(config); + if (analyzerJobEntity.getUserId() != null) { + List users = Utils.getUserMail(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getUserId()); + if (users.size() > 0) { + Map additionalConfig = Collections.emptyMap(); + additionalConfig.put(Constants.ANALYZER_REPORT_CONFIG_PATH + "." + AlertEmailConstants.SENDER, users.get(0).getMailAddress()); + cloneConfig = ConfigFactory.parseMap(additionalConfig).withFallback(cloneConfig); + } + } + ApplicationEmailService emailService = new ApplicationEmailService(cloneConfig, Constants.ANALYZER_REPORT_CONFIG_PATH); String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId()); AlertEmailContext alertContext = emailService.buildEmailContext(subject); emailService.onAlert(alertContext, alertData); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java index b139b3c250..365f60ad1d 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java @@ -39,8 +39,8 @@ public class SimpleDeduplicator implements AlertDeduplicator, Serializable { @Override public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) { long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL; - if (analyzerJobEntity.getJobMeta().containsKey(Constants.DEDUP_INTERVAL_KEY)) { - dedupInterval = (Long)analyzerJobEntity.getJobMeta().get(Constants.DEDUP_INTERVAL_KEY); + if (analyzerJobEntity.getJobMeta().getConfiguration().containsKey(Constants.DEDUP_INTERVAL_KEY)) { + dedupInterval = (Long)analyzerJobEntity.getJobMeta().getConfiguration().get(Constants.DEDUP_INTERVAL_KEY); } dedupInterval = dedupInterval * 1000; diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java index 80d9fb7678..635d7ab00e 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java @@ -20,8 +20,9 @@ import com.google.inject.Inject; import org.apache.eagle.common.rest.RESTResponse; import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; -import org.apache.eagle.jpm.analyzer.meta.model.PublisherEntity; +import org.apache.eagle.jpm.analyzer.util.Constants; import javax.ws.rs.*; import javax.ws.rs.core.MediaType; @@ -38,16 +39,16 @@ public AnalyzerResource() { } @POST - @Path(META_PATH) + @Path(JOB_META_ROOT_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public RESTResponse addJobMeta(JobMetaEntity jobMetaEntity) { return RESTResponse.async((response) -> { jobMetaEntity.ensureDefault(); boolean ret = metaManagementService.addJobMeta(jobMetaEntity); - String message = "Successfully add job meta for " + jobMetaEntity.getJobDefId(); + String message = "Successfully add job meta for " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId(); if (!ret) { - message = "Failed to add job meta for " + jobMetaEntity.getJobDefId(); + message = "Failed to add job meta for " + jobMetaEntity.getSiteId() + ": " + jobMetaEntity.getJobDefId(); } response.success(ret).message(message); }).get(); @@ -56,13 +57,17 @@ public RESTResponse addJobMeta(JobMetaEntity jobMetaEntity) { @POST @Path(JOB_META_PATH) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId, JobMetaEntity jobMetaEntity) { + public RESTResponse updateJobMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.JOB_DEF_ID) String jobDefId, + JobMetaEntity jobMetaEntity) { return RESTResponse.async((response) -> { - jobMetaEntity.ensureDefault(); - boolean ret = metaManagementService.updateJobMeta(jobDefId, jobMetaEntity); - String message = "Successfully update job meta for " + jobDefId; + jobMetaEntity.setModifiedTime(System.currentTimeMillis()); + jobMetaEntity.setSiteId(siteId); + jobMetaEntity.setJobDefId(jobDefId); + boolean ret = metaManagementService.updateJobMeta(jobMetaEntity); + String message = "Successfully update job meta for " + siteId + ":" + jobDefId; if (!ret) { - message = "Failed to update job meta for " + jobDefId; + message = "Failed to update job meta for " + siteId + ":" + jobDefId; } response.success(ret).message(message); }).get(); @@ -71,20 +76,22 @@ public RESTResponse updateJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId @GET @Path(JOB_META_PATH) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse> getJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) { - return RESTResponse.async(() -> metaManagementService.getJobMeta(jobDefId)).get(); + public RESTResponse> getJobMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.JOB_DEF_ID) String jobDefId) { + return RESTResponse.async(() -> metaManagementService.getJobMeta(siteId, jobDefId)).get(); } @DELETE @Path(JOB_META_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId) { + public RESTResponse deleteJobMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.JOB_DEF_ID) String jobDefId) { return RESTResponse.async((response) -> { - boolean ret = metaManagementService.deleteJobMeta(jobDefId); - String message = "Successfully delete job meta for " + jobDefId; + boolean ret = metaManagementService.deleteJobMeta(siteId, jobDefId); + String message = "Successfully delete job meta for " + siteId + ": " + jobDefId; if (!ret) { - message = "Failed to delete job meta for " + jobDefId; + message = "Failed to delete job meta for " + siteId + ": " + jobDefId; } response.success(ret).message(message); @@ -92,40 +99,62 @@ public RESTResponse deleteJobMeta(@PathParam(JOB_DEF_PATH) String jobDefId } @POST - @Path(PUBLISHER_PATH) + @Path(USER_META_ROOT_PATH) + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public RESTResponse addEmailPublisherMeta(UserEmailEntity userEmailEntity) { + return RESTResponse.async((response) -> { + userEmailEntity.ensureDefault(); + boolean ret = metaManagementService.addUserEmailMeta(userEmailEntity); + String message = "Successfully add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); + if (!ret) { + message = "Failed to add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); + } + response.success(ret).message(message); + }).get(); + } + + @POST + @Path(USER_META_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse addPublisherMeta(PublisherEntity publisherEntity) { + public RESTResponse updateEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.USER_ID) String userId, + UserEmailEntity userEmailEntity) { return RESTResponse.async((response) -> { - publisherEntity.ensureDefault(); - boolean ret = metaManagementService.addPublisherMeta(publisherEntity); - String message = "Successfully add publisher meta for " + publisherEntity.getUserId(); + userEmailEntity.setSiteId(siteId); + userEmailEntity.setUserId(userId); + userEmailEntity.setModifiedTime(System.currentTimeMillis()); + boolean ret = metaManagementService.updateUserEmailMeta(userEmailEntity); + String message = "Successfully add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); if (!ret) { - message = "Failed to add publisher meta for " + publisherEntity.getUserId(); + message = "Failed to add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); } response.success(ret).message(message); }).get(); } @DELETE - @Path(PUBLISHER_META_PATH) + @Path(USER_META_PATH) @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse deletePublisherMeta(@PathParam(USER_PATH) String userId) { + public RESTResponse deleteEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.USER_ID) String userId) { return RESTResponse.async((response) -> { - boolean ret = metaManagementService.deletePublisherMeta(userId); - String message = "Successfully delete publisher meta for " + userId; + boolean ret = metaManagementService.deleteUserEmailMeta(siteId, userId); + String message = "Successfully delete user meta for " + siteId + ":" + userId; if (!ret) { - message = "Failed to delete publisher meta for " + userId; + message = "Failed to delete user meta for " + siteId + ":" + userId; } response.success(ret).message(message); }).get(); } @GET - @Path(PUBLISHER_META_PATH) + @Path(USER_META_PATH) @Produces(MediaType.APPLICATION_JSON) - public RESTResponse> getPublisherMeta(@PathParam(USER_PATH) String userId) { - return RESTResponse.async(() -> metaManagementService.getPublisherMeta(userId)).get(); + public RESTResponse> getEmailPublisherMeta(@PathParam(Constants.SITE_ID) String siteId, + @PathParam(Constants.USER_ID) String userId) { + return RESTResponse.async(() -> metaManagementService.getUserEmailMeta(siteId, userId)).get(); } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java index 4c6661a43a..933a4afc51 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java @@ -31,14 +31,16 @@ public class Constants { public static final String CONTEXT_PATH = "service.context"; public static final String READ_TIMEOUT_PATH = "service.readTimeOutSeconds"; - public static final String META_PATH = "/metadata"; - public static final String ANALYZER_PATH = "/job/analyzer"; - public static final String JOB_DEF_PATH = "jobDefId"; - public static final String JOB_META_PATH = META_PATH + "/{" + JOB_DEF_PATH + "}"; + public static final String ANALYZER_PATH = "/analyzer"; - public static final String PUBLISHER_PATH = "/publisher"; - public static final String USER_PATH = "userId"; - public static final String PUBLISHER_META_PATH = PUBLISHER_PATH + "/{" + USER_PATH + "}"; + public static final String SITE_ID = "siteId"; + public static final String JOB_META_ROOT_PATH = "/jobmeta"; + public static final String JOB_DEF_ID = "jobDefId"; + public static final String JOB_META_PATH = JOB_META_ROOT_PATH + "/{" + SITE_ID + "}/" + "{" + JOB_DEF_ID + "}"; + + public static final String USER_META_ROOT_PATH = "/usermeta"; + public static final String USER_ID = "userId"; + public static final String USER_META_PATH = USER_META_ROOT_PATH + "/{" + SITE_ID + "}/" + "{" + USER_ID + "}"; public static final String PROCESS_NONE = "PROCESS_NONE"; diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java index a987bd8bac..e406adfdf6 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java @@ -21,6 +21,7 @@ import com.typesafe.config.Config; import org.apache.eagle.common.rest.RESTResponse; import org.apache.eagle.jpm.analyzer.meta.model.JobMetaEntity; +import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.ObjectMapper; @@ -40,7 +41,7 @@ public class Utils { OBJ_MAPPER.configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, true); } - public static List getJobMeta(Config config, String jobDefId) { + public static List getJobMeta(Config config, String siteId, String jobDefId) { List result = new ArrayList<>(); String url = "http://" + config.getString(Constants.HOST_PATH) @@ -48,7 +49,9 @@ public static List getJobMeta(Config config, String jobDefId) { + config.getInt(Constants.PORT_PATH) + config.getString(Constants.CONTEXT_PATH) + Constants.ANALYZER_PATH - + Constants.META_PATH + + Constants.JOB_META_ROOT_PATH + + "/" + + siteId + "/" + URLEncoder.encode(jobDefId); @@ -65,6 +68,33 @@ public static List getJobMeta(Config config, String jobDefId) { } } + public static List getUserMail(Config config, String siteId, String userId) { + List result = new ArrayList<>(); + String url = "http://" + + config.getString(Constants.HOST_PATH) + + ":" + + config.getInt(Constants.PORT_PATH) + + config.getString(Constants.CONTEXT_PATH) + + Constants.ANALYZER_PATH + + Constants.USER_META_ROOT_PATH + + "/" + + siteId + + "/" + + URLEncoder.encode(userId); + + InputStream is = null; + try { + is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE); + LOG.info("get user meta from {}", url); + result = (List)OBJ_MAPPER.readValue(is, RESTResponse.class).getData(); + } catch (Exception e) { + LOG.warn("failed to get user meta from {}", url, e); + } finally { + org.apache.eagle.jpm.util.Utils.closeInputStream(is); + return result; + } + } + public static > List> sortByValue(Map map) { List> list = new LinkedList<>(map.entrySet()); Collections.sort(list, (e1, e2) -> e1.getValue().compareTo(e2.getValue())); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql index 78fd02af13..07e820ba87 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql @@ -16,26 +16,22 @@ -- * -- */ -CREATE TABLE IF NOT EXISTS jobs ( - jobDefId VARCHAR(50) NOT NULL, - configuration MEDIUMTEXT NOT NULL, +CREATE TABLE IF NOT EXISTS analysis_jobs ( + uuid varchar(50) PRIMARY KEY, + jobDefId varchar(50) NOT NULL, + siteId varchar(50) NOT NULL, + configuration mediumtext NOT NULL, + evaluators mediumtext NOT NULL, createdtime bigint(20) DEFAULT NULL, modifiedtime bigint(20) DEFAULT NULL, - PRIMARY KEY (jobDefId) + UNIQUE (jobDefId) ); -CREATE TABLE IF NOT EXISTS job_evaluators ( - jobDefId VARCHAR(50) NOT NULL, - evaluator VARCHAR(100) NOT NULL, - createdtime bigint(20) DEFAULT NULL, - modifiedtime bigint(20) DEFAULT NULL, - PRIMARY KEY (jobDefId, evaluator) -); - -CREATE TABLE IF NOT EXISTS job_publishments ( - userId VARCHAR(100) PRIMARY KEY, +CREATE TABLE IF NOT EXISTS analysis_email ( + uuid varchar(50) PRIMARY KEY, + userId varchar(100) NOT NULL, mailAddress mediumtext NOT NULL, createdtime bigint(20) DEFAULT NULL, modifiedtime bigint(20) DEFAULT NULL, - PRIMARY KEY (userId) + UNIQUE (userId) ); \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java index 8751e73e88..30c63a80eb 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/MRHistoryJobApplicationProvider.java @@ -19,11 +19,14 @@ import com.codahale.metrics.health.HealthCheck; import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; -import io.dropwizard.lifecycle.Managed; import org.apache.eagle.app.service.ApplicationListener; import org.apache.eagle.app.spi.AbstractApplicationProvider; +import org.apache.eagle.jpm.analyzer.meta.MetaManagementService; +import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceJDBCImpl; +import org.apache.eagle.jpm.analyzer.meta.impl.MetaManagementServiceMemoryImpl; +import org.apache.eagle.metadata.service.memory.MemoryMetadataStore; +import org.apache.eagle.metadata.store.jdbc.JDBCMetadataStore; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -52,4 +55,10 @@ public Optional> getSharedServices(Config envConfig) { return Optional.empty(); } } + + @Override + protected void onRegister() { + bind(MemoryMetadataStore.class, MetaManagementService.class, MetaManagementServiceMemoryImpl.class); + bind(JDBCMetadataStore.class, MetaManagementService.class, MetaManagementServiceJDBCImpl.class); + } } \ No newline at end of file From 8a14c7bf4ceb5f89d41076117c96c67bc2688f3d Mon Sep 17 00:00:00 2001 From: wujinhu Date: Sat, 4 Mar 2017 15:09:17 +0800 Subject: [PATCH 02/11] add jdbc storage support for sla job meta --- .../analyzer/meta/model/JobMetaEntity.java | 1 - .../jpm/analyzer/mr/sla/SLAJobEvaluator.java | 2 +- .../UnExpectedLongDurationJobProcessor.java | 2 +- .../mr/suggestion/JobSuggestionEvaluator.java | 10 + .../MapReduceCompressionSettingProcessor.java | 2 +- .../MapReduceDataSkewProcessor.java | 2 +- .../suggestion/MapReduceGCTimeProcessor.java | 2 +- .../MapReduceQueueResourceProcessor.java | 2 +- .../suggestion/MapReduceSpillProcessor.java | 2 +- .../MapReduceSplitSettingProcessor.java | 2 +- .../suggestion/MapReduceTaskNumProcessor.java | 2 +- .../analyzer/publisher/EmailPublisher.java | 27 ++- .../eagle/jpm/analyzer/publisher/Result.java | 17 +- .../analyzer/resource/AnalyzerResource.java | 4 +- .../eagle/jpm/analyzer/util/Constants.java | 2 +- .../apache/eagle/jpm/analyzer/util/Utils.java | 6 +- .../main/resources/AnalyzerReportTemplate.vm | 219 ++++++++++++++---- .../src/main/resources/createTable.sql | 7 +- .../history/crawler/JHFCrawlerDriverImpl.java | 2 +- .../history/parser/JobSuggestionListener.java | 9 +- .../jpm/mr/history/storm/JobHistorySpout.java | 2 +- 21 files changed, 238 insertions(+), 86 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java index 90605ce4ac..8d4af8efa6 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/model/JobMetaEntity.java @@ -17,7 +17,6 @@ package org.apache.eagle.jpm.analyzer.meta.model; -import org.apache.eagle.jpm.analyzer.util.Constants; import org.apache.eagle.metadata.persistence.PersistenceEntity; import org.codehaus.jackson.annotate.JsonIgnoreProperties; diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java index 13bad50af8..ec7a641bdb 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/SLAJobEvaluator.java @@ -51,7 +51,7 @@ public Result.EvaluatorResult evaluate(AnalyzerEntity analyzerJobEntity) { Result.EvaluatorResult result = new Result.EvaluatorResult(); List jobMetaEntities = Utils.getJobMeta(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getJobDefId()); - if (jobMetaEntities.size() == 0 + if (jobMetaEntities == null || jobMetaEntities.size() == 0 || !jobMetaEntities.get(0).getEvaluators().contains(this.getClass().getSimpleName())) { LOG.info("SLAJobEvaluator skip job {}", analyzerJobEntity.getJobDefId()); return result; diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index 116ac2c485..7e74954869 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -62,7 +62,7 @@ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime; for (Map.Entry entry : sorted) { if (expirePercent >= entry.getValue()) { - return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration by %d%%, average duration is %ds", + return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration(calculated by historical executions of this job) by %d%%, average duration is %ds", (int)(expirePercent * 100), avgDurationTime / 1000)); } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java index ea60ff9667..e1a357a9ee 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/JobSuggestionEvaluator.java @@ -63,6 +63,16 @@ public Result.EvaluatorResult evaluate(MapReduceAnalyzerEntity analyzerEntity) { return null; } + + if (analyzerEntity.getTotalCounters() == null) { + LOG.warn("Total counters of Job {} is null", analyzerEntity.getJobId()); + return null; + } + if (analyzerEntity.getMapCounters() == null && analyzerEntity.getReduceCounters() == null) { + LOG.warn("Map/Reduce task counters of Job {} are null", analyzerEntity.getJobId()); + return null; + } + MapReduceJobSuggestionContext jobContext = new MapReduceJobSuggestionContext(analyzerEntity); if (jobContext.getNumMaps() == 0) { return null; diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java index 62c5c2bf3c..8638376e33 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java @@ -75,7 +75,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.NOTICE, sb.toString(), optSettings); } return null; } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java index b21a927ba2..2d1611bc6b 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java @@ -53,7 +53,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.INFO, sb.toString()); + return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.NOTICE, sb.toString()); } } catch (NullPointerException e) { // When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java index 4007747543..30fb68fb4c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java @@ -68,7 +68,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.INFO, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.NOTICE, sb.toString(), optSettings); } } catch (NullPointerException e) { // When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java index a1b57bf4ba..ad140fbebf 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java @@ -70,7 +70,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.INFO, sb.toString()); + return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.NOTICE, sb.toString()); } } } catch (Exception e) { diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java index 96be2d5033..6bc69d62d7 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java @@ -100,7 +100,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.INFO, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.NOTICE, sb.toString(), optSettings); } } catch (NullPointerException e) { //When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java index 8eba4687a2..5be8e7b83b 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java @@ -40,7 +40,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) if (context.getJobconf().getLong(FileInputFormat.SPLIT_MINSIZE, 0) > 1) { sb.append("Best practice: don't set " + FileInputFormat.SPLIT_MINSIZE); sb.append(", because it may lower data locality, hence maps will run slower.\n"); - return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.INFO, sb.toString()); + return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.NOTICE, sb.toString()); } return null; } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java index 00d5cc988f..024ae00b71 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java @@ -45,7 +45,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) sb.append(analyzeMapTaskNum(optSettings)); if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.INFO, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.NOTICE, sb.toString(), optSettings); } } catch (NullPointerException e) { // When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index 65a36637ef..e7020ee775 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -19,6 +19,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import org.apache.eagle.alert.engine.publisher.PublishConstants; import org.apache.eagle.app.service.ApplicationEmailService; import org.apache.eagle.common.DateTimeUtil; import org.apache.eagle.common.mail.AlertEmailConstants; @@ -77,28 +78,29 @@ public void publish(AnalyzerEntity analyzerJobEntity, Result result) { basic.put("detail", getJobLink(analyzerJobEntity)); Map> extend = result.getAlertMessages(); + Map alertData = new HashMap<>(); for (String evaluator : extend.keySet()) { for (Result.ProcessorResult message : extend.get(evaluator)) { + setAlertLevel(alertData, message.getResultLevel()); LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]", analyzerJobEntity.getJobDefId(), message.getMessage(), message.getResultLevel(), evaluator); } } - Map alertData = new HashMap<>(); alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic); alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend); - Config cloneConfig = ConfigFactory.empty().withFallback(config); if (analyzerJobEntity.getUserId() != null) { List users = Utils.getUserMail(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getUserId()); - if (users.size() > 0) { - Map additionalConfig = Collections.emptyMap(); - additionalConfig.put(Constants.ANALYZER_REPORT_CONFIG_PATH + "." + AlertEmailConstants.SENDER, users.get(0).getMailAddress()); + if (users != null && users.size() > 0) { + Map additionalConfig = new HashMap<>(); + additionalConfig.put(Constants.ANALYZER_REPORT_CONFIG_PATH + "." + AlertEmailConstants.RECIPIENTS, users.get(0).getMailAddress()); cloneConfig = ConfigFactory.parseMap(additionalConfig).withFallback(cloneConfig); } } ApplicationEmailService emailService = new ApplicationEmailService(cloneConfig, Constants.ANALYZER_REPORT_CONFIG_PATH); String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId()); + alertData.put(PublishConstants.ALERT_EMAIL_SUBJECT, subject); AlertEmailContext alertContext = emailService.buildEmailContext(subject); emailService.onAlert(alertContext, alertData); } @@ -113,4 +115,19 @@ private String getJobLink(AnalyzerEntity analyzerJobEntity) { + "/jpm/detail/" + analyzerJobEntity.getJobId(); } + + private void setAlertLevel(Map alertData, Result.ResultLevel level) { + if (!alertData.containsKey(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY)) { + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, Result.ResultLevel.NOTICE.toString()); + } + + if (level.equals(Result.ResultLevel.CRITICAL)) { + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString()); + } + + if (level.equals(Result.ResultLevel.WARNING) + && !alertData.get(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY).equals(Result.ResultLevel.CRITICAL.toString())) { + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, level.toString()); + } + } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java index 7d7442b6ae..4ea46b260c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java @@ -47,7 +47,7 @@ public void addEvaluatorResult(Class type, EvaluatorResult result) { alertMessages.put(typeName, new ArrayList<>()); alertEntities.put(typeName, new ArrayList<>()); } - normalizeResult(processorResult); + //normalizeResult(processorResult); alertMessages.get(typeName).add(processorResult); alertEntities.get(typeName).add(processorEntities.get(processorType)); @@ -63,11 +63,11 @@ public Map> getAlertEntities() { } private void normalizeResult(ProcessorResult processorResult) { - String settingList = ""; + /*String settingList = ""; if (processorResult.getSettings() != null && !processorResult.getSettings().isEmpty()) { settingList = StringUtils.join(processorResult.getSettings(), "\n"); } - processorResult.setSettingList(settingList); + processorResult.setSettingList(settingList);*/ } /** @@ -76,7 +76,6 @@ private void normalizeResult(ProcessorResult processorResult) { public enum ResultLevel { NONE, - INFO, NOTICE, WARNING, CRITICAL @@ -100,20 +99,20 @@ public static class ProcessorResult { private ResultLevel resultLevel; private String message; private List settings; - private String settingList; + //private String settingList; public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message, List settings) { this.ruleType = ruleType; this.resultLevel = resultLevel; this.message = message; - this.settings = settings; + //this.settings = settings; } public ProcessorResult(RuleType ruleType, ResultLevel resultLevel, String message) { this.ruleType = ruleType; this.resultLevel = resultLevel; this.message = message; - this.settings = new ArrayList<>(); + //this.settings = new ArrayList<>(); } public RuleType getRuleType() { @@ -148,13 +147,13 @@ public void setSettings(List settings) { this.settings = settings; } - public String getSettingList() { + /*public String getSettingList() { return settingList; } public void setSettingList(String settingList) { this.settingList = settingList; - } + }*/ } /** diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java index 635d7ab00e..a9c3171fda 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/resource/AnalyzerResource.java @@ -126,9 +126,9 @@ public RESTResponse updateEmailPublisherMeta(@PathParam(Constants.SITE_ID) userEmailEntity.setUserId(userId); userEmailEntity.setModifiedTime(System.currentTimeMillis()); boolean ret = metaManagementService.updateUserEmailMeta(userEmailEntity); - String message = "Successfully add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); + String message = "Successfully update user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); if (!ret) { - message = "Failed to add user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); + message = "Failed to update user meta for " + userEmailEntity.getSiteId() + ": " + userEmailEntity.getUserId(); } response.success(ret).message(message); }).get(); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java index 933a4afc51..d992255483 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java @@ -60,7 +60,7 @@ public class Constants { public static final int DEFAULT_DEDUP_INTERVAL = 300; public static final String ANALYZER_REPORT_CONFIG_PATH = "application.analyzerReport"; - public static final String ANALYZER_REPORT_SUBJECT = "Job Performance Alert For Job: %s"; + public static final String ANALYZER_REPORT_SUBJECT = "Performance Insights For %s"; public static final String ANALYZER_REPORT_DATA_BASIC_KEY = "basic"; public static final String ANALYZER_REPORT_DATA_EXTEND_KEY = "extend"; diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java index e406adfdf6..9c1a2c77de 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Utils.java @@ -25,6 +25,8 @@ import org.apache.eagle.jpm.util.resourcefetch.connection.InputStreamUtils; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +61,7 @@ public static List getJobMeta(Config config, String siteId, Strin try { is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE); LOG.info("get job meta from {}", url); - result = (List)OBJ_MAPPER.readValue(is, RESTResponse.class).getData(); + result = ((RESTResponse>)OBJ_MAPPER.readValue(is, new TypeReference>>(){})).getData(); } catch (Exception e) { LOG.warn("failed to get job meta from {}", url, e); } finally { @@ -86,7 +88,7 @@ public static List getUserMail(Config config, String siteId, St try { is = InputStreamUtils.getInputStream(url, null, org.apache.eagle.jpm.util.Constants.CompressionType.NONE); LOG.info("get user meta from {}", url); - result = (List)OBJ_MAPPER.readValue(is, RESTResponse.class).getData(); + result = ((RESTResponse>)OBJ_MAPPER.readValue(is, new TypeReference>>(){})).getData(); } catch (Exception e) { LOG.warn("failed to get user meta from {}", url, e); } finally { diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm index 996adba044..f4a666ea31 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm @@ -14,11 +14,27 @@ ~ See the License for the specific language governing permissions and ~ limitations under the License. --> - - + + + #set ( $alert = $alertList[0] ) + +## Generate Alert Color +#set($alertColor = "#337ab7") +#if($alert["alertSeverity"] == "WARNING") + #set($alertColor = "#FF9F00") +#elseif($alert["alertSeverity"] == "CRITICAL") + #set($alertColor = "#d43f3a") +#elseif ($alert["alertSeverity"] == "NOTICE") + #set($alertColor = "#68B90F") +#end + + - + + [$alert["alertSeverity"] $alert["alertSubject"] - - #set ( $elem = $alertList[0] ) - -

Basic Information:

- -
    -
  • Site: ${elem["basic"].get("site")}
  • -
  • Job Name: ${elem["basic"].get("name")}
  • -
  • User: ${elem["basic"].get("user")}
  • -
  • Job Status: ${elem["basic"].get("status")}
  • -
  • Start Time: ${elem["basic"].get("start")}
  • -
  • End Time: ${elem["basic"].get("end")}
  • -
  • Duration Time: ${elem["basic"].get("duration")}
  • -
  • Progress: ${elem["basic"].get("progress")}
  • -
  • Job Detail: ${elem["basic"].get("detail")}
  • -
- -

Analyzer Results:

- -#foreach($evaluator in ${elem["extend"].keySet()}) - - - - - - - + + + +
Analysis By $evaluator
typemessageoptimizer settinglevel
+ + + + - #foreach($result in ${elem["extend"].get($evaluator)}) - - - - - - - #end
+
+ + + + + + + +
+ $alert["alertSeverity"]: + $alert["alertSubject"] +
+
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Basic Information
+ Severity: + $alert["alertSeverity"] +
+ Site: + ${alert["basic"].get("site")} +
+ Name: + ${alert["basic"].get("name")} +
+ User: + ${alert["basic"].get("user")} +
+ Status: + ${alert["basic"].get("status")} +
+ Progress: + ${alert["basic"].get("progress")} +
+ Start: + ${alert["basic"].get("start")} +
+ End: + ${alert["basic"].get("end")} +
+ Duration: + ${alert["basic"].get("duration")} +
+ Detail: + ${alert["basic"].get("detail")} +
+
+ + + + + + + #foreach($evaluator in ${alert["extend"].keySet()}) + #foreach($result in ${alert["extend"].get($evaluator)}) + + + + + #end + #end +
Analysis Results
levelmessage
${result.resultLevel}${result.message}
+ +
+
+
${result.ruleType}${result.message}${result.settingList}${result.resultLevel}
-#end - - \ No newline at end of file + diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql index 07e820ba87..7d725d282b 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/createTable.sql @@ -18,20 +18,21 @@ CREATE TABLE IF NOT EXISTS analysis_jobs ( uuid varchar(50) PRIMARY KEY, - jobDefId varchar(50) NOT NULL, + jobDefId varchar(100) NOT NULL, siteId varchar(50) NOT NULL, configuration mediumtext NOT NULL, evaluators mediumtext NOT NULL, createdtime bigint(20) DEFAULT NULL, modifiedtime bigint(20) DEFAULT NULL, - UNIQUE (jobDefId) + UNIQUE (siteId, jobDefId) ); CREATE TABLE IF NOT EXISTS analysis_email ( uuid varchar(50) PRIMARY KEY, userId varchar(100) NOT NULL, + siteId varchar(50) NOT NULL, mailAddress mediumtext NOT NULL, createdtime bigint(20) DEFAULT NULL, modifiedtime bigint(20) DEFAULT NULL, - UNIQUE (userId) + UNIQUE (siteId, userId) ); \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java index 2c9dc8f97a..2e56632883 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/crawler/JHFCrawlerDriverImpl.java @@ -38,7 +38,7 @@ public class JHFCrawlerDriverImpl implements JHFCrawlerDriver { private static final Logger LOG = LoggerFactory.getLogger(JHFCrawlerDriverImpl.class); - private static final int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 10; + private static final int SLEEP_SECONDS_WHILE_QUEUE_IS_EMPTY = 120; private static final String FORMAT_JOB_PROCESS_DATE = "%4d%02d%02d"; private static final Pattern PATTERN_JOB_PROCESS_DATE = Pattern.compile("([0-9]{4})([0-9]{2})([0-9]{2})"); diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java index 724c62cd9e..b174a2f5d9 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/parser/JobSuggestionListener.java @@ -74,6 +74,7 @@ public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { info.setFailedMaps(jobExecutionAPIEntity.getNumFailedMaps()); info.setTotalMaps(jobExecutionAPIEntity.getNumTotalMaps()); info.setTotalReduces(jobExecutionAPIEntity.getNumTotalReduces()); + info.setProgress(100); } } @@ -89,14 +90,6 @@ public void jobCountersCreated(JobCounters totalCounters, JobCounters mapCounter @Override public void flush() throws Exception { - if (info.getTotalCounters() == null) { - LOG.warn("Total counters of Job {} is null", info.getJobId()); - return; - } - if (info.getMapCounters() == null && info.getReduceCounters() == null) { - LOG.warn("Map/Reduce task counters of Job {} are null", info.getJobId()); - return; - } analyzer.analyze(info); } } diff --git a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java index d7daa5ec31..11deefe6f4 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java +++ b/eagle-jpm/eagle-jpm-mr-history/src/main/java/org/apache/eagle/jpm/mr/history/storm/JobHistorySpout.java @@ -175,7 +175,7 @@ public void nextTuple() { } } finally { try { - Thread.sleep(1000); + Thread.sleep(5000); } catch (Exception e) { // ignored } From 4b9f030ba8ac4abc44eb6cb0233f4a611166811f Mon Sep 17 00:00:00 2001 From: wujinhu Date: Sat, 4 Mar 2017 18:12:54 +0800 Subject: [PATCH 03/11] add jdbc storage support for sla job meta --- .../mr/sla/processors/UnExpectedLongDurationJobProcessor.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index 7e74954869..a829a84814 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -62,7 +62,8 @@ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { double expirePercent = (analyzerJobEntity.getDurationTime() - avgDurationTime) * 1.0 / avgDurationTime; for (Map.Entry entry : sorted) { if (expirePercent >= entry.getValue()) { - return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), String.format("Job duration exceeds average duration(calculated by historical executions of this job) by %d%%, average duration is %ds", + return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, entry.getKey(), + String.format("Job duration exceeds average duration(calculated by historical executions of this job) by %d%%, average duration is %ds", (int)(expirePercent * 100), avgDurationTime / 1000)); } } From e04a795853719549e95d7923dedc2bafee395f05 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 14:40:53 +0800 Subject: [PATCH 04/11] add jdbc storage support for sla job meta --- .../impl/orm/RelationToJobMetaEntity.java | 19 ++++++++++++++++++- .../UnExpectedLongDurationJobProcessor.java | 9 +++++++-- .../analyzer/publisher/EmailPublisher.java | 5 +++++ .../eagle/jpm/analyzer/publisher/Result.java | 14 ++++++++++++-- .../dedup/impl/SimpleDeduplicator.java | 8 ++------ 5 files changed, 44 insertions(+), 11 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java index ec2a59bba4..180eb8d223 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/meta/impl/orm/RelationToJobMetaEntity.java @@ -64,7 +64,11 @@ private Map parse(String field) { Iterator keyItemItr = jsonObject.keys(); while (keyItemItr.hasNext()) { String itemKey = keyItemItr.next(); - items.put(itemKey, jsonObject.get(itemKey)); + if (canParseToMap(jsonObject.getString(itemKey))) { + items.put(itemKey, parse(jsonObject.getString(itemKey))); + } else { + items.put(itemKey, jsonObject.get(itemKey)); + } } } catch (Exception e) { @@ -73,4 +77,17 @@ private Map parse(String field) { return items; } + + private boolean canParseToMap(String field) { + try { + JSONObject jsonObject = new JSONObject(field); + Iterator keyItemItr = jsonObject.keys(); + while (keyItemItr.hasNext()) { + keyItemItr.next(); + } + return true; + } catch (Exception e) { + return false; + } + } } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index a829a84814..b9a708fe24 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -48,14 +48,19 @@ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId()); Map jobMetaData = analyzerJobEntity.getJobMeta().getConfiguration(); - long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); + //long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); + long avgDurationTime = (long)(analyzerJobEntity.getDurationTime() * 0.9); + if (avgDurationTime == 0L) { return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); } Map alertThreshold = Constants.DEFAULT_ALERT_THRESHOLD; if (jobMetaData.containsKey(Constants.ALERT_THRESHOLD_KEY)) { - alertThreshold = (Map)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY); + Map alertThresholds = (Map)jobMetaData.get(Constants.ALERT_THRESHOLD_KEY); + for (String level : alertThresholds.keySet()) { + alertThreshold.put(Result.ResultLevel.fromString(level), alertThresholds.get(level)); + } } List> sorted = Utils.sortByValue(alertThreshold); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index e7020ee775..84138a8858 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -53,6 +53,11 @@ public EmailPublisher(Config config) { @Override //will refactor, just work now public void publish(AnalyzerEntity analyzerJobEntity, Result result) { + if (!config.hasPath(Constants.ANALYZER_REPORT_CONFIG_PATH)) { + LOG.warn("no email configuration, skip send email"); + return; + } + if (result.getAlertMessages().size() == 0) { return; } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java index 4ea46b260c..196ca5d7cf 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java @@ -17,7 +17,6 @@ package org.apache.eagle.jpm.analyzer.publisher; -import org.apache.commons.lang3.StringUtils; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import java.util.ArrayList; @@ -78,7 +77,18 @@ public enum ResultLevel { NONE, NOTICE, WARNING, - CRITICAL + CRITICAL; + + private static final Map stringToLevels = new HashMap<>(); + static { + for (ResultLevel level : values()) { + stringToLevels.put(level.toString(), level); + } + } + + public static ResultLevel fromString(String levelString) { + return stringToLevels.get(levelString); + } } public enum RuleType { diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java index 365f60ad1d..8d7aad1fc3 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java @@ -40,17 +40,13 @@ public class SimpleDeduplicator implements AlertDeduplicator, Serializable { public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) { long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL; if (analyzerJobEntity.getJobMeta().getConfiguration().containsKey(Constants.DEDUP_INTERVAL_KEY)) { - dedupInterval = (Long)analyzerJobEntity.getJobMeta().getConfiguration().get(Constants.DEDUP_INTERVAL_KEY); + dedupInterval = (Integer)analyzerJobEntity.getJobMeta().getConfiguration().get(Constants.DEDUP_INTERVAL_KEY); } dedupInterval = dedupInterval * 1000; long currentTimeStamp = System.currentTimeMillis(); if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) { - if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) { - return true; - } else { - return false; - } + return lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp; } else { lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); return false; From 09083cb3971ec6305419a5e13cd674da8ddcbde1 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 14:42:40 +0800 Subject: [PATCH 05/11] add jdbc storage support for sla job meta --- .../mr/sla/processors/UnExpectedLongDurationJobProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index b9a708fe24..88e799d67d 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -48,8 +48,7 @@ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId()); Map jobMetaData = analyzerJobEntity.getJobMeta().getConfiguration(); - //long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); - long avgDurationTime = (long)(analyzerJobEntity.getDurationTime() * 0.9); + long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); if (avgDurationTime == 0L) { return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); From 77f33f0475cbefa1a26ed6f3b0067ac538a9a2cc Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 16:11:21 +0800 Subject: [PATCH 06/11] fix MapRFSAuditLogAppProviderTest.testStartAsManagedApplication --- .../app/test/ApplicationSimulatorImpl.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java index a5f5a7343f..d67499212f 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java @@ -29,6 +29,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; public class ApplicationSimulatorImpl extends ApplicationSimulator { @@ -74,24 +75,25 @@ public void start(String appType, Map appConfig) { // Start application applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid())); statusUpdateService.updateApplicationEntityStatus(applicationEntity); - applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid())); - int attempt = 0; - while (attempt < 10) { - attempt++; - statusUpdateService.updateApplicationEntityStatus(applicationEntity); - if (applicationEntity.getStatus() == ApplicationEntity.Status.STOPPED - || applicationEntity.getStatus() == ApplicationEntity.Status.INITIALIZED) { - break; - } else { + Semaphore semp = new Semaphore(1); + Thread stopThread = new Thread(() -> { + applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid())); + while (applicationEntity.getStatus() != ApplicationEntity.Status.INITIALIZED + && applicationEntity.getStatus() != ApplicationEntity.Status.STOPPED) { + statusUpdateService.updateApplicationEntityStatus(applicationEntity); try { - Thread.sleep(500); - } catch (InterruptedException e) { - // Ignore + Thread.sleep(1000); + } catch (Exception e) { } } - } - if (attempt >= 10 ) { - throw new IllegalStateException("Application status didn't become STOPPED in 10 attempts"); + semp.release(); + }); + stopThread.start(); + try { + stopThread.join(); + semp.acquire(); + } catch (Exception e) { + throw new IllegalStateException("Application status didn't become STOPPED"); } applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); } From a1556791e25d16ae72695f794396b7486c5a6a63 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 16:40:54 +0800 Subject: [PATCH 07/11] fix MapRFSAuditLogAppProviderTest.testStartAsManagedApplication --- .../org/apache/eagle/app/test/ApplicationSimulatorImpl.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java index d67499212f..6c8cde300a 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java @@ -26,6 +26,8 @@ import org.apache.eagle.metadata.resource.SiteResource; import org.apache.eagle.metadata.service.ApplicationStatusUpdateService; import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -33,6 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger; public class ApplicationSimulatorImpl extends ApplicationSimulator { + private static final Logger LOG = LoggerFactory.getLogger(ApplicationSimulatorImpl.class); + private final Config config; private final SiteResource siteResource; private final ApplicationResource applicationResource; @@ -84,6 +88,7 @@ public void start(String appType, Map appConfig) { try { Thread.sleep(1000); } catch (Exception e) { + LOG.warn("{}", e); } } semp.release(); From d7b02a99bf70e58f854d18059387387e32ec35fe Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 17:38:36 +0800 Subject: [PATCH 08/11] move dedup from publisher --- .../app/test/ApplicationSimulatorImpl.java | 7 +++-- .../analyzer/mr/MRJobPerformanceAnalyzer.java | 10 ++++++ .../UnExpectedLongDurationJobProcessor.java | 3 +- .../publisher/EagleStorePublisher.java | 6 ---- .../analyzer/publisher/EmailPublisher.java | 6 ---- .../dedup/impl/SimpleDeduplicator.java | 31 ++++++++++++------- 6 files changed, 35 insertions(+), 28 deletions(-) diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java index 6c8cde300a..b10205f0a7 100644 --- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java +++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulatorImpl.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class ApplicationSimulatorImpl extends ApplicationSimulator { @@ -95,12 +96,12 @@ public void start(String appType, Map appConfig) { }); stopThread.start(); try { - stopThread.join(); - semp.acquire(); + stopThread.join(60000L); + semp.tryAcquire(60, TimeUnit.SECONDS); + applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); } catch (Exception e) { throw new IllegalStateException("Application status didn't become STOPPED"); } - applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid())); } @Override diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java index e32a37c0f4..34365dc768 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/MRJobPerformanceAnalyzer.java @@ -27,6 +27,8 @@ import org.apache.eagle.jpm.analyzer.publisher.EmailPublisher; import org.apache.eagle.jpm.analyzer.publisher.Publisher; import org.apache.eagle.jpm.analyzer.publisher.Result; +import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; +import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +43,7 @@ public class MRJobPerformanceAnalyzer implements JobAn private List publishers = new ArrayList<>(); private Config config; + private AlertDeduplicator alertDeduplicator; public MRJobPerformanceAnalyzer(Config config) { this.config = config; @@ -49,6 +52,8 @@ public MRJobPerformanceAnalyzer(Config config) { publishers.add(new EagleStorePublisher(config)); publishers.add(new EmailPublisher(config)); + + this.alertDeduplicator = new SimpleDeduplicator(); } @Override @@ -62,6 +67,11 @@ public void analyze(T analyzerJobEntity) throws Exception { } } + if (alertDeduplicator.dedup(analyzerJobEntity, result)) { + LOG.info("skip publish job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); + return; + } + for (Publisher publisher : publishers) { publisher.publish(analyzerJobEntity, result); } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index 88e799d67d..d9a608348d 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -48,7 +48,8 @@ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId()); Map jobMetaData = analyzerJobEntity.getJobMeta().getConfiguration(); - long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); + //long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); + long avgDurationTime = (long)(analyzerJobEntity.getDurationTime() * 0.7); if (avgDurationTime == 0L) { return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java index 0d7d2d7783..1c5a03398c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EagleStorePublisher.java @@ -37,11 +37,9 @@ public class EagleStorePublisher implements Publisher, Serializable { private Config config; private IEagleServiceClient client; - private AlertDeduplicator alertDeduplicator; public EagleStorePublisher(Config config) { this.config = config; - this.alertDeduplicator = new SimpleDeduplicator(); } @Override @@ -51,10 +49,6 @@ public void publish(AnalyzerEntity analyzerJobEntity, Result result) { } LOG.info("EagleStorePublisher gets job {}", analyzerJobEntity.getJobDefId()); - if (alertDeduplicator.dedup(analyzerJobEntity, result)) { - LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); - return; - } try { this.client = new EagleServiceClientImpl(config); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index 84138a8858..c70ccf461e 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -43,11 +43,9 @@ public class EmailPublisher implements Publisher, Serializable { private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class); private Config config; - private AlertDeduplicator alertDeduplicator; public EmailPublisher(Config config) { this.config = config; - this.alertDeduplicator = new SimpleDeduplicator(); } @Override @@ -63,10 +61,6 @@ public void publish(AnalyzerEntity analyzerJobEntity, Result result) { } LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId()); - if (alertDeduplicator.dedup(analyzerJobEntity, result)) { - LOG.info("skip job {} alert because it is duplicated", analyzerJobEntity.getJobDefId()); - return; - } Map basic = new HashMap<>(); basic.put("site", analyzerJobEntity.getSiteId()); diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java index 8d7aad1fc3..b5723e98cb 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java @@ -34,22 +34,29 @@ public class SimpleDeduplicator implements AlertDeduplicator, Serializable { private static final Logger LOG = LoggerFactory.getLogger(SimpleDeduplicator.class); - private Map lastUpdateTime = new HashMap<>(); + private static Map lastUpdateTime = new HashMap<>(); @Override public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) { - long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL; - if (analyzerJobEntity.getJobMeta().getConfiguration().containsKey(Constants.DEDUP_INTERVAL_KEY)) { - dedupInterval = (Integer)analyzerJobEntity.getJobMeta().getConfiguration().get(Constants.DEDUP_INTERVAL_KEY); - } + synchronized (lastUpdateTime) { + long dedupInterval = Constants.DEFAULT_DEDUP_INTERVAL; + if (analyzerJobEntity.getJobMeta().getConfiguration().containsKey(Constants.DEDUP_INTERVAL_KEY)) { + dedupInterval = (Integer)analyzerJobEntity.getJobMeta().getConfiguration().get(Constants.DEDUP_INTERVAL_KEY); + } - dedupInterval = dedupInterval * 1000; - long currentTimeStamp = System.currentTimeMillis(); - if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) { - return lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp; - } else { - lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); - return false; + dedupInterval = dedupInterval * 1000; + long currentTimeStamp = System.currentTimeMillis(); + if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) { + if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) { + return true; + } else { + lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); + return false; + } + } else { + lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); + return false; + } } } } From 9cb241b8f31e06cb459ef38d9553e6dcc820783e Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 17:39:27 +0800 Subject: [PATCH 09/11] move dedup from publisher --- .../mr/sla/processors/UnExpectedLongDurationJobProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java index d9a608348d..88e799d67d 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/sla/processors/UnExpectedLongDurationJobProcessor.java @@ -48,8 +48,7 @@ public Result.ProcessorResult process(AnalyzerEntity analyzerJobEntity) { LOG.info("Job {} In UnExpectedLongDurationJobProcessor", analyzerJobEntity.getJobDefId()); Map jobMetaData = analyzerJobEntity.getJobMeta().getConfiguration(); - //long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); - long avgDurationTime = (long)(analyzerJobEntity.getDurationTime() * 0.7); + long avgDurationTime = getAvgDuration(analyzerJobEntity, jobMetaData); if (avgDurationTime == 0L) { return new Result.ProcessorResult(Result.RuleType.LONG_DURATION_JOB, Result.ResultLevel.NONE, Constants.PROCESS_NONE); From 818d03bd55a2e71cac8df11c80fa15e22ecf19d0 Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 18:08:32 +0800 Subject: [PATCH 10/11] change NOTICE to INFO --- .../mr/suggestion/MapReduceCompressionSettingProcessor.java | 2 +- .../analyzer/mr/suggestion/MapReduceDataSkewProcessor.java | 2 +- .../jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java | 2 +- .../mr/suggestion/MapReduceQueueResourceProcessor.java | 4 +--- .../jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java | 2 +- .../mr/suggestion/MapReduceSplitSettingProcessor.java | 5 +---- .../analyzer/mr/suggestion/MapReduceTaskNumProcessor.java | 2 +- .../apache/eagle/jpm/analyzer/publisher/EmailPublisher.java | 5 +---- .../java/org/apache/eagle/jpm/analyzer/publisher/Result.java | 2 +- .../java/org/apache/eagle/jpm/analyzer/util/Constants.java | 2 +- .../src/main/resources/AnalyzerReportTemplate.vm | 2 +- 11 files changed, 11 insertions(+), 19 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java index 8638376e33..62c5c2bf3c 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceCompressionSettingProcessor.java @@ -75,7 +75,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.NOTICE, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.COMPRESS, Result.ResultLevel.INFO, sb.toString(), optSettings); } return null; } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java index 2d1611bc6b..b21a927ba2 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceDataSkewProcessor.java @@ -53,7 +53,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.NOTICE, sb.toString()); + return new Result.ProcessorResult(Result.RuleType.DATA_SKEW, Result.ResultLevel.INFO, sb.toString()); } } catch (NullPointerException e) { // When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java index 30fb68fb4c..4007747543 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceGCTimeProcessor.java @@ -68,7 +68,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.NOTICE, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.GC_TIME, Result.ResultLevel.INFO, sb.toString(), optSettings); } } catch (NullPointerException e) { // When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java index ad140fbebf..a86eb72554 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceQueueResourceProcessor.java @@ -26,8 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; - /* * Criterion: (TimeElapsed / (numTasks / 500 * avgTaskTime)) > 20 */ @@ -70,7 +68,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.NOTICE, sb.toString()); + return new Result.ProcessorResult(Result.RuleType.RESOURCE_CONTENTION, Result.ResultLevel.INFO, sb.toString()); } } } catch (Exception e) { diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java index 6bc69d62d7..96be2d5033 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSpillProcessor.java @@ -100,7 +100,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) } if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.NOTICE, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.SPILL, Result.ResultLevel.INFO, sb.toString(), optSettings); } } catch (NullPointerException e) { //When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java index 5be8e7b83b..28e11292a5 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceSplitSettingProcessor.java @@ -22,9 +22,6 @@ import org.apache.eagle.jpm.analyzer.publisher.Result; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import java.util.ArrayList; -import java.util.List; - public class MapReduceSplitSettingProcessor implements Processor { private MapReduceJobSuggestionContext context; @@ -40,7 +37,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) if (context.getJobconf().getLong(FileInputFormat.SPLIT_MINSIZE, 0) > 1) { sb.append("Best practice: don't set " + FileInputFormat.SPLIT_MINSIZE); sb.append(", because it may lower data locality, hence maps will run slower.\n"); - return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.NOTICE, sb.toString()); + return new Result.ProcessorResult(Result.RuleType.SPLIT, Result.ResultLevel.INFO, sb.toString()); } return null; } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java index 024ae00b71..00d5cc988f 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/mr/suggestion/MapReduceTaskNumProcessor.java @@ -45,7 +45,7 @@ public Result.ProcessorResult process(MapReduceAnalyzerEntity jobAnalysisEntity) sb.append(analyzeMapTaskNum(optSettings)); if (sb.length() > 0) { - return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.NOTICE, sb.toString(), optSettings); + return new Result.ProcessorResult(Result.RuleType.TASK_NUMBER, Result.ResultLevel.INFO, sb.toString(), optSettings); } } catch (NullPointerException e) { // When job failed there may not have counters, so just ignore it diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java index c70ccf461e..471dbf8c78 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/EmailPublisher.java @@ -26,15 +26,12 @@ import org.apache.eagle.common.mail.AlertEmailContext; import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; -import org.apache.eagle.jpm.analyzer.publisher.dedup.AlertDeduplicator; -import org.apache.eagle.jpm.analyzer.publisher.dedup.impl.SimpleDeduplicator; import org.apache.eagle.jpm.analyzer.util.Constants; import org.apache.eagle.jpm.analyzer.util.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -117,7 +114,7 @@ private String getJobLink(AnalyzerEntity analyzerJobEntity) { private void setAlertLevel(Map alertData, Result.ResultLevel level) { if (!alertData.containsKey(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY)) { - alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, Result.ResultLevel.NOTICE.toString()); + alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, Result.ResultLevel.INFO.toString()); } if (level.equals(Result.ResultLevel.CRITICAL)) { diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java index 196ca5d7cf..748a5d5f3e 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/Result.java @@ -75,7 +75,7 @@ private void normalizeResult(ProcessorResult processorResult) { public enum ResultLevel { NONE, - NOTICE, + INFO, WARNING, CRITICAL; diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java index d992255483..4ddc27eb85 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/util/Constants.java @@ -50,7 +50,7 @@ public class Constants { public static final String ALERT_THRESHOLD_KEY = "alert.threshold"; public static final Map DEFAULT_ALERT_THRESHOLD = new HashMap() { { - put(Result.ResultLevel.NOTICE, 0.1); + put(Result.ResultLevel.INFO, 0.1); put(Result.ResultLevel.WARNING, 0.3); put(Result.ResultLevel.CRITICAL, 0.5); } diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm index f4a666ea31..0486a338e8 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/resources/AnalyzerReportTemplate.vm @@ -25,7 +25,7 @@ #set($alertColor = "#FF9F00") #elseif($alert["alertSeverity"] == "CRITICAL") #set($alertColor = "#d43f3a") -#elseif ($alert["alertSeverity"] == "NOTICE") +#elseif ($alert["alertSeverity"] == "INFO") #set($alertColor = "#68B90F") #end From 4ec8b9ce429587e9ba43a527c3f7184a9bb1d80a Mon Sep 17 00:00:00 2001 From: wujinhu Date: Mon, 6 Mar 2017 18:11:46 +0800 Subject: [PATCH 11/11] change code style --- .../publisher/dedup/impl/SimpleDeduplicator.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java index b5723e98cb..f8155f1948 100644 --- a/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java +++ b/eagle-jpm/eagle-jpm-analyzer/src/main/java/org/apache/eagle/jpm/analyzer/publisher/dedup/impl/SimpleDeduplicator.java @@ -47,12 +47,12 @@ public boolean dedup(AnalyzerEntity analyzerJobEntity, Result result) { dedupInterval = dedupInterval * 1000; long currentTimeStamp = System.currentTimeMillis(); if (lastUpdateTime.containsKey(analyzerJobEntity.getJobDefId())) { - if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) { - return true; - } else { - lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); - return false; - } + if (lastUpdateTime.get(analyzerJobEntity.getJobDefId()) + dedupInterval > currentTimeStamp) { + return true; + } else { + lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); + return false; + } } else { lastUpdateTime.put(analyzerJobEntity.getJobDefId(), currentTimeStamp); return false;