From 13dc0ff75ae812aef1e599fca36809db704ee840 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 6 Apr 2017 19:34:09 +0800 Subject: [PATCH 01/14] refine publishmentType --- eagle-assembly/src/main/doc/metadata-ddl.sql | 7 ------- .../coordinator/DeduplicationDefinition.java | 7 +++++++ .../engine/coordinator/PublishmentType.java | 21 ++++++++----------- 3 files changed, 16 insertions(+), 19 deletions(-) create mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java diff --git a/eagle-assembly/src/main/doc/metadata-ddl.sql b/eagle-assembly/src/main/doc/metadata-ddl.sql index 0334623e26..3312576fac 100644 --- a/eagle-assembly/src/main/doc/metadata-ddl.sql +++ b/eagle-assembly/src/main/doc/metadata-ddl.sql @@ -164,10 +164,3 @@ CREATE TABLE IF NOT EXISTS analysis_email ( modifiedtime bigint(20) DEFAULT NULL, UNIQUE (siteId, userId) ); - -INSERT INTO publishment_type(id, content) VALUES -('Kafka', '{"name":"Kafka","type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher","description":null,"fields":[{"name":"kafka_broker","value":"sandbox.hortonworks.com:6667"},{"name":"topic"}]}'), -('Email', '{"name":"Email","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher","description":null,"fields":[{"name":"subject"},{"name":"sender"}, {"name":"recipients"}]}'), -('Slack', '{"name":"Slack","type":"org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher","description":null,"fields":[{"name":"token"},{"name":"channels"}, {"name":"severitys"}, {"name":"urltemplate"}]}'), -('HBaseStorage', '{"name":"HBaseStorage","type":"org.apache.eagle.alert.app.AlertEagleStorePlugin","description":null,"fields":[]}'), -('JDBCStorage', '{"name":"JDBCStorage","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEagleStorePlugin","description":null,"fields":[]}'); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java new file mode 100644 index 0000000000..1eb7952903 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java @@ -0,0 +1,7 @@ +package org.apache.eagle.alert.engine.coordinator; + +/** + * Created by qingwzhao on 4/6/17. + */ +public class DeduplicationDefinition { +} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java index f7025f26a4..3119ee635f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java @@ -26,17 +26,6 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class PublishmentType { private String name; - - @Override - public String toString() { - return "PublishmentType{" - + "name='" + name + '\'' - + ", type='" + type + '\'' - + ", description='" + description + '\'' - + ", fields=" + fields - + '}'; - } - private String type; private String description; private List> fields = new LinkedList<>(); @@ -73,7 +62,15 @@ public void setFields(List> fields) { this.fields = fields; } - + @Override + public String toString() { + return "PublishmentType{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + ", description='" + description + '\'' + + ", fields=" + fields + + '}'; + } @Override public boolean equals(Object obj) { From 5953465b12c87f0d0f98a5220da87e35d78a9c2e Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 6 Apr 2017 19:35:23 +0800 Subject: [PATCH 02/14] update policy deletion --- .../metadata/impl/JdbcMetadataDaoImpl.java | 2 +- .../metadata/impl/JdbcMetadataHandler.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index e0b5c9dff3..6427d8cfe0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -227,7 +227,7 @@ public OpResult removeDataSource(String datasourceId) { @Override public OpResult removePolicy(String policyId) { - return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId); + return handler.removePolicyById(PolicyDefinition.class.getSimpleName(), policyId); } @Override diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java index a9e3c5e400..7fffa55c56 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java @@ -455,6 +455,36 @@ public OpResult addPublishmentsToPolicy(String policyId, List publishmen return result; } + public OpResult removePolicyById(String clzName, String policyId) { + Connection connection = null; + PreparedStatement statement = null; + OpResult result = new OpResult(); + try { + String tb = getTableName(clzName); + connection = dataSource.getConnection(); + connection.setAutoCommit(false); + statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb)); + statement.setString(1, policyId); + int status = statement.executeUpdate(); + LOG.info("delete {} policy {} from {}", status, policyId, tb); + closeResource(null, statement, null); + + statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT); + statement.setString(1, policyId); + status = statement.executeUpdate(); + LOG.info("delete {} records from policy_publishment", status); + + connection.commit(); + connection.setAutoCommit(true); + } catch (SQLException e) { + e.printStackTrace(); + } finally { + closeResource(null, statement, connection); + } + LOG.info(result.message); + return result; + } + public OpResult removeById(String clzName, String key) { Connection connection = null; PreparedStatement statement = null; From 7faf383445626ccc7650cf925f8637c0dbc204ac Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 7 Apr 2017 21:50:01 +0800 Subject: [PATCH 03/14] add AlertDeduplication --- .../coordinator/AlertDeduplication.java | 71 +++++++ .../coordinator/DeduplicationDefinition.java | 7 - .../engine/coordinator/PolicyDefinition.java | 13 +- .../publisher/email/AlertEmailGenerator.java | 4 +- .../publisher/impl/AbstractPublishPlugin.java | 17 +- .../publisher/impl/AlertEmailPublisher.java | 2 - .../publisher/impl/AlertPublisherImpl.java | 43 ++--- .../publisher/impl/DefaultDeduplicator.java | 10 +- .../engine/runner/AlertPublisherBolt.java | 17 +- .../mr/history/JHFEventReaderBaseTest.java | 74 ++++++++ .../src/test/resources/application.conf | 74 ++++++++ .../job_1479206441898_508949_conf.xml | 178 ++++++++++++++++-- 12 files changed, 451 insertions(+), 59 deletions(-) create mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java delete mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java create mode 100644 eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java create mode 100644 eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java new file mode 100644 index 0000000000..78fef7aed3 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import org.apache.commons.collections.ListUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.List; +import java.util.Objects; + +public class AlertDeduplication { + private String dedupIntervalMin; + private List dedupFields; + + public String getDedupIntervalMin() { + return dedupIntervalMin; + } + + public void setDedupIntervalMin(String dedupIntervalMin) { + this.dedupIntervalMin = dedupIntervalMin; + } + + public List getDedupFields() { + return dedupFields; + } + + public void setDedupFields(List dedupFields) { + this.dedupFields = dedupFields; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(dedupFields) + .append(dedupIntervalMin) + .build(); + } + + @Override + public boolean equals(Object that) { + if (that == this) { + return true; + } + if (!(that instanceof AlertDeduplication)) { + return false; + } + AlertDeduplication another = (AlertDeduplication) that; + if (ListUtils.isEqualList(another.dedupFields, this.dedupFields) + && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) { + return true; + } + return false; + } + + +} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java deleted file mode 100644 index 1eb7952903..0000000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.eagle.alert.engine.coordinator; - -/** - * Created by qingwzhao on 4/6/17. - */ -public class DeduplicationDefinition { -} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index c377e41d43..5004513a41 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -43,6 +43,7 @@ public class PolicyDefinition implements Serializable { private Definition stateDefinition; private PolicyStatus policyStatus = PolicyStatus.ENABLED; private AlertDefinition alertDefinition; + private AlertDeduplication deduplication; // one stream only have one partition in one policy, since we don't support stream alias private List partitionSpec = new ArrayList(); @@ -147,6 +148,7 @@ public int hashCode() { .append(policyStatus) .append(parallelismHint) .append(alertDefinition) + .append(deduplication) .build(); } @@ -172,7 +174,8 @@ public boolean equals(Object that) { && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) && another.policyStatus.equals(this.policyStatus) && another.parallelismHint == this.parallelismHint - && Objects.equals(another.alertDefinition, alertDefinition)) { + && Objects.equals(another.alertDefinition, alertDefinition) + && Objects.equals(another.deduplication, deduplication)) { return true; } return false; @@ -202,6 +205,14 @@ public void setSiteId(String siteId) { this.siteId = siteId; } + public AlertDeduplication getDeduplication() { + return deduplication; + } + + public void setDeduplication(AlertDeduplication deduplication) { + this.deduplication = deduplication; + } + @JsonIgnoreProperties(ignoreUnknown = true) public static class Definition implements Serializable { private static final long serialVersionUID = -622366527887848346L; diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java index 1bcac17f0a..a57941e43f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java @@ -139,7 +139,9 @@ private Map buildAlertContext(AlertStreamEvent event) { alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, generateAlertDataDesc(event)); alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_CATEGORY, event.getCategory()); alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, event.getSeverity().toString()); - alertContext.put(PublishConstants.ALERT_EMAIL_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime())); + alertContext.put(PublishConstants.ALERT_EMAIL_TIME, String.format("%s %s", + DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()), + DateTimeUtil.CURRENT_TIME_ZONE.getID())); alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, event.getStreamId()); alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy()); alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, Version.version); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index b155bb887f..c5c9e040da 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -29,6 +29,7 @@ import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator; import org.slf4j.Logger; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,8 +72,14 @@ public void init(Config config, Publishment publishment, Map conf) throws Except getLogger().error(String.format("initialize extended deduplicator %s failed", spec.getClassName()), t); } } else { - this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(), - publishment.getDedupFields(), publishment.getDedupStateField(), publishment.getDedupStateCloseValue(), dedupCache); + if (publishment.getDedupIntervalMin() != null && !publishment.getDedupIntervalMin().isEmpty()) { + this.deduplicator = new DefaultDeduplicator( + publishment.getDedupIntervalMin(), + publishment.getDedupFields(), + publishment.getDedupStateField(), + publishment.getDedupStateCloseValue(), + dedupCache); + } this.pubName = publishment.getName(); } String serializerClz = publishment.getSerializer(); @@ -98,7 +105,11 @@ public void update(String dedupIntervalMin, Map pluginProperties @Override public List dedup(AlertStreamEvent event) { - return deduplicator.dedup(event); + if (null != deduplicator) { + return deduplicator.dedup(event); + } else { + return Collections.singletonList(event); + } } @Override diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java index 152a9f1a44..f40680cfdd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java @@ -40,7 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*; import static org.apache.eagle.common.mail.AlertEmailConstants.*; public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { @@ -215,7 +214,6 @@ public PublishmentType getPluginType() { .name("Email") .type(AlertEmailPublisher.class) .description("Email alert publisher") - .field("subject") .field("sender") .field("recipients") .build(); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index 5b902f95a1..e38799f462 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -43,7 +43,10 @@ public class AlertPublisherImpl implements AlertPublisher { private final String name; - private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + // + private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + //private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + private Config config; private Map conf; @@ -73,11 +76,11 @@ public void nextEvent(PublishPartition partition, AlertStreamEvent event) { private void notifyAlert(PublishPartition partition, AlertStreamEvent event) { // remove the column values for publish plugin match partition.getColumnValues().clear(); - if (!publishPluginMapping.containsKey(partition)) { + if (!publishPluginMapping.containsKey(partition.getPublishId())) { LOG.warn("PublishPartition {} is not found in publish plugin map", partition); return; } - AlertPublishPlugin plugin = publishPluginMapping.get(partition); + AlertPublishPlugin plugin = publishPluginMapping.get(partition.getPublishId()); if (plugin == null) { LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition); return; @@ -120,7 +123,7 @@ public synchronized void onPublishChange(List added, } // copy and swap to avoid concurrency issue - Map newPublishMap = new HashMap<>(publishPluginMapping); + Map newPublishMap = new HashMap<>(publishPluginMapping); // added for (Publishment publishment : added) { @@ -128,9 +131,7 @@ public synchronized void onPublishChange(List added, AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (plugin != null) { - for (PublishPartition p : getPublishPartitions(publishment)) { - newPublishMap.put(p, plugin); - } + newPublishMap.put(publishment.getName(), plugin); } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -138,16 +139,9 @@ public synchronized void onPublishChange(List added, //removed List toBeClosed = new ArrayList<>(); for (Publishment publishment : removed) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.remove(p); - } else { - newPublishMap.remove(p); - } - } - if (plugin != null) { - toBeClosed.add(plugin); + AlertPublishPlugin publishPlugin = newPublishMap.remove(publishment.getName()); + if (publishPlugin != null) { + toBeClosed.add(publishPlugin); } } // updated @@ -155,16 +149,11 @@ public synchronized void onPublishChange(List added, // for updated publishment, need to init them too AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (newPlugin != null) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.get(p); - } - newPublishMap.put(p, newPlugin); - } - if (plugin != null) { - toBeClosed.add(plugin); + AlertPublishPlugin oldPlugin = newPublishMap.get(publishment.getName()); + if (oldPlugin != null) { + toBeClosed.add(oldPlugin); } + newPublishMap.put(publishment.getName(), newPlugin); } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -199,7 +188,7 @@ private void closePlugins(List toBeClosed) { try { p.close(); } catch (Exception e) { - LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e); + LOG.error("Error when close publish plugin {}!", p.getClass().getCanonicalName(), e); } } } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java index ac99db3780..54d551e685 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java @@ -20,6 +20,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.engine.coordinator.AlertDeduplication; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; @@ -59,6 +60,13 @@ public DefaultDeduplicator(long intervalMin) { this.dedupIntervalSec = intervalMin; } + public DefaultDeduplicator(AlertDeduplication alertDeduplication) { + this.customDedupFields = alertDeduplication.getDedupFields(); + this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60; + this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( + this.dedupIntervalSec, TimeUnit.SECONDS).build(); + } + public DefaultDeduplicator(String intervalMin, List customDedupFields, String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) { setDedupIntervalMin(intervalMin); @@ -81,7 +89,7 @@ public DefaultDeduplicator(String intervalMin, List customDedupFields, * @param key * @return */ - public List checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { + private List checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { if (StringUtils.isBlank(stateFiledValue)) { // without state field, we cannot determine whether it is duplicated // without custom filed values, we cannot determine whether it is duplicated diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index 44a5fe91db..ce46e9c329 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -33,6 +33,7 @@ import org.apache.eagle.alert.engine.publisher.AlertStreamFilter; import org.apache.eagle.alert.engine.publisher.PipeStreamFilter; import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; +import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider; import org.apache.eagle.alert.utils.AlertConstants; @@ -43,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener { @@ -51,6 +53,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli private volatile Map cachedPublishments = new HashMap<>(); private volatile Map policyDefinitionMap; private volatile Map streamDefinitionMap; + private volatile Map deduplicatorMap = new ConcurrentHashMap<>(); private AlertTemplateEngine alertTemplateEngine; private boolean logEventEnabled; @@ -87,6 +90,12 @@ public void execute(Tuple input) { if (logEventEnabled) { LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event); } + if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) { + List eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event); + if (eventList == null || eventList.isEmpty()) { + return; + } + } AlertStreamEvent filteredEvent = alertFilter.filter(event); if (filteredEvent != null) { alertPublisher.nextEvent(partition, filteredEvent); @@ -139,7 +148,7 @@ public synchronized void onAlertPublishSpecChange(PublishSpec pubSpec, Map pds, Map sds) { + public synchronized void onAlertPolicyChange(Map pds, Map sds) { List policyToRemove = new ArrayList<>(); if (this.policyDefinitionMap != null) { policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList())); @@ -151,6 +160,9 @@ public void onAlertPolicyChange(Map pds, Map entry : pds.entrySet()) { try { this.alertTemplateEngine.register(entry.getValue()); + if (entry.getValue().getDeduplication() != null) { + this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication())); + } } catch (Throwable throwable) { LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable); } @@ -159,6 +171,9 @@ public void onAlertPolicyChange(Map pds, Map confKeyPatterns = new ArrayList<>(); + confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB); + confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB); + confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB); + confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB); + for (String key : confKeyPatterns) { + builder.includeJobKeyPatterns(Pattern.compile(key)); + } + JobHistoryContentFilter filter = builder.build(); + + MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(ConfigFactory.load()); + Map tags = new HashMap<>(); + tags.put("site", "sandbox"); + tags.put("jobId", "job_1490593856016_152289"); + tags.put("jobType", "HIVE"); + tags.put("jobDefId", "INSERT OVERWRITE TABLE kyl...'2017-04-06')))(Stage-1)"); + JHFMRVer2EventReader reader = new JHFMRVer2EventReader(tags, conf, filter, appConfig); + reader.addListener(new JobConfigurationCreationServiceListener(appConfig.getEagleServiceConfig()) { + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + Assert.assertTrue(null != entity); + Assert.assertTrue(entity instanceof JobConfigurationAPIEntity); + JobConfigurationAPIEntity configurationAPIEntity = (JobConfigurationAPIEntity) entity; + Assert.assertTrue(configurationAPIEntity.getJobConfig().getConfig().size() == 1); + } + }); + reader.parseConfiguration(); + } +} diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf new file mode 100644 index 0000000000..00b14a85ac --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "appId":"mrHistoryJob", + "mode":"LOCAL", + "workers" : 3, + "siteId" : "sandbox", + application.storm.nimbusHost=localhost + + "stormConfig" : { + "mrHistoryJobSpoutTasks" : 6, + "jobKafkaSinkTasks" : 1, + "taskAttemptKafkaSinkTasks" : 1 + }, + + "zookeeper" : { + "zkQuorum" : "sandbox.hortonworks.com:2181", + "zkRoot" : "/test_mrjobhistory", + "zkSessionTimeoutMs" : 15000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 20000 + }, + + "endpointConfig" : { + "timeZone" : "UTC", + "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888", + "basePath" : "/mr-history/done", + "hdfs" : { + fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020", + #if not need, then do not set + # hdfs.kerberos.principal = , + # hdfs.keytab.file = + # .... + } + }, + + "service": { + "host": "localhost", + "port": 9090, + "username": "admin", + "password": "secret", + "readTimeOutSeconds" : 10, + context = "/rest" + }, + + "dataSinkConfig": { + "topic" : "map_reduce_failed_job", + "brokerList" : "sandbox.hortonworks.com:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + "producerType" : "async", + "numBatchMessages" : "4096", + "maxQueueBufferMs" : "5000", + "requestRequiredAcks" : "0" + }, + + "MRConfigureKeys" : { + "jobNameKey" : "eagle.job.name", + "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts" + } +} \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml index 6d22996727..dfccf8e5d3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml @@ -1,18 +1,164 @@ - + + hive.optimize.skewjoin.compiletime + false + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.query.string + + select a.phone_number from customer_details a, call_detail_records b where a.phone_number=b.phone_number + + programatically + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + dfs.blockreport.initialDelay + 120 + hdfs-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.reduce.markreset.buffer.percent + 0.0 + mapred-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + file.client-write-packet-size + 65536 + core-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + + hadoop.http.authentication.simple.anonymous.allowed + + true + core-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.querylog.location + /tmp/hive + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + yarn.timeline-service.leveldb-timeline-store.path + /hadoop/yarn/timeline + yarn-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + yarn.resourcemanager.proxy-user-privileges.enabled + false + yarn-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.exec.script.allow.partial.consumption + false + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.server2.global.init.file.location + /etc/hive/conf + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + + yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms + + 10000 + yarn-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + dfs.datanode.slow.io.warning.threshold.ms + 300 + hdfs-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.support.concurrency + true + file:/etc/hive/conf/hive-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.reduce.shuffle.merge.percent + 0.66 + mapred-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.task.skip.start.attempts + 2 + mapred-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + \ No newline at end of file From 92165c143386274616c8fcb5483c7b1b46fdb42b Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 6 Apr 2017 19:34:09 +0800 Subject: [PATCH 04/14] refine publishmentType --- eagle-assembly/src/main/doc/metadata-ddl.sql | 7 ------- .../coordinator/DeduplicationDefinition.java | 7 +++++++ .../engine/coordinator/PublishmentType.java | 21 ++++++++----------- 3 files changed, 16 insertions(+), 19 deletions(-) create mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java diff --git a/eagle-assembly/src/main/doc/metadata-ddl.sql b/eagle-assembly/src/main/doc/metadata-ddl.sql index 0334623e26..3312576fac 100644 --- a/eagle-assembly/src/main/doc/metadata-ddl.sql +++ b/eagle-assembly/src/main/doc/metadata-ddl.sql @@ -164,10 +164,3 @@ CREATE TABLE IF NOT EXISTS analysis_email ( modifiedtime bigint(20) DEFAULT NULL, UNIQUE (siteId, userId) ); - -INSERT INTO publishment_type(id, content) VALUES -('Kafka', '{"name":"Kafka","type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher","description":null,"fields":[{"name":"kafka_broker","value":"sandbox.hortonworks.com:6667"},{"name":"topic"}]}'), -('Email', '{"name":"Email","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher","description":null,"fields":[{"name":"subject"},{"name":"sender"}, {"name":"recipients"}]}'), -('Slack', '{"name":"Slack","type":"org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher","description":null,"fields":[{"name":"token"},{"name":"channels"}, {"name":"severitys"}, {"name":"urltemplate"}]}'), -('HBaseStorage', '{"name":"HBaseStorage","type":"org.apache.eagle.alert.app.AlertEagleStorePlugin","description":null,"fields":[]}'), -('JDBCStorage', '{"name":"JDBCStorage","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEagleStorePlugin","description":null,"fields":[]}'); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java new file mode 100644 index 0000000000..1eb7952903 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java @@ -0,0 +1,7 @@ +package org.apache.eagle.alert.engine.coordinator; + +/** + * Created by qingwzhao on 4/6/17. + */ +public class DeduplicationDefinition { +} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java index f7025f26a4..3119ee635f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java @@ -26,17 +26,6 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class PublishmentType { private String name; - - @Override - public String toString() { - return "PublishmentType{" - + "name='" + name + '\'' - + ", type='" + type + '\'' - + ", description='" + description + '\'' - + ", fields=" + fields - + '}'; - } - private String type; private String description; private List> fields = new LinkedList<>(); @@ -73,7 +62,15 @@ public void setFields(List> fields) { this.fields = fields; } - + @Override + public String toString() { + return "PublishmentType{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + ", description='" + description + '\'' + + ", fields=" + fields + + '}'; + } @Override public boolean equals(Object obj) { From 6db2bdfe49ae63fb85b849198fa38b825b373450 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 6 Apr 2017 19:35:23 +0800 Subject: [PATCH 05/14] update policy deletion --- .../metadata/impl/JdbcMetadataDaoImpl.java | 2 +- .../metadata/impl/JdbcMetadataHandler.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index e0b5c9dff3..6427d8cfe0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -227,7 +227,7 @@ public OpResult removeDataSource(String datasourceId) { @Override public OpResult removePolicy(String policyId) { - return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId); + return handler.removePolicyById(PolicyDefinition.class.getSimpleName(), policyId); } @Override diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java index a9e3c5e400..7fffa55c56 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java @@ -455,6 +455,36 @@ public OpResult addPublishmentsToPolicy(String policyId, List publishmen return result; } + public OpResult removePolicyById(String clzName, String policyId) { + Connection connection = null; + PreparedStatement statement = null; + OpResult result = new OpResult(); + try { + String tb = getTableName(clzName); + connection = dataSource.getConnection(); + connection.setAutoCommit(false); + statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb)); + statement.setString(1, policyId); + int status = statement.executeUpdate(); + LOG.info("delete {} policy {} from {}", status, policyId, tb); + closeResource(null, statement, null); + + statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT); + statement.setString(1, policyId); + status = statement.executeUpdate(); + LOG.info("delete {} records from policy_publishment", status); + + connection.commit(); + connection.setAutoCommit(true); + } catch (SQLException e) { + e.printStackTrace(); + } finally { + closeResource(null, statement, connection); + } + LOG.info(result.message); + return result; + } + public OpResult removeById(String clzName, String key) { Connection connection = null; PreparedStatement statement = null; From b447b4dde577ca5f2e9df7e41380d270a40008e6 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 7 Apr 2017 21:50:01 +0800 Subject: [PATCH 06/14] add AlertDeduplication --- .../coordinator/AlertDeduplication.java | 71 +++++++ .../coordinator/DeduplicationDefinition.java | 7 - .../engine/coordinator/PolicyDefinition.java | 13 +- .../publisher/email/AlertEmailGenerator.java | 4 +- .../publisher/impl/AbstractPublishPlugin.java | 17 +- .../publisher/impl/AlertEmailPublisher.java | 2 - .../publisher/impl/AlertPublisherImpl.java | 43 ++--- .../publisher/impl/DefaultDeduplicator.java | 10 +- .../engine/runner/AlertPublisherBolt.java | 17 +- .../mr/history/JHFEventReaderBaseTest.java | 74 ++++++++ .../src/test/resources/application.conf | 74 ++++++++ .../job_1479206441898_508949_conf.xml | 178 ++++++++++++++++-- 12 files changed, 451 insertions(+), 59 deletions(-) create mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java delete mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java create mode 100644 eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java create mode 100644 eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java new file mode 100644 index 0000000000..78fef7aed3 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import org.apache.commons.collections.ListUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.List; +import java.util.Objects; + +public class AlertDeduplication { + private String dedupIntervalMin; + private List dedupFields; + + public String getDedupIntervalMin() { + return dedupIntervalMin; + } + + public void setDedupIntervalMin(String dedupIntervalMin) { + this.dedupIntervalMin = dedupIntervalMin; + } + + public List getDedupFields() { + return dedupFields; + } + + public void setDedupFields(List dedupFields) { + this.dedupFields = dedupFields; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(dedupFields) + .append(dedupIntervalMin) + .build(); + } + + @Override + public boolean equals(Object that) { + if (that == this) { + return true; + } + if (!(that instanceof AlertDeduplication)) { + return false; + } + AlertDeduplication another = (AlertDeduplication) that; + if (ListUtils.isEqualList(another.dedupFields, this.dedupFields) + && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) { + return true; + } + return false; + } + + +} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java deleted file mode 100644 index 1eb7952903..0000000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.eagle.alert.engine.coordinator; - -/** - * Created by qingwzhao on 4/6/17. - */ -public class DeduplicationDefinition { -} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index c377e41d43..5004513a41 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -43,6 +43,7 @@ public class PolicyDefinition implements Serializable { private Definition stateDefinition; private PolicyStatus policyStatus = PolicyStatus.ENABLED; private AlertDefinition alertDefinition; + private AlertDeduplication deduplication; // one stream only have one partition in one policy, since we don't support stream alias private List partitionSpec = new ArrayList(); @@ -147,6 +148,7 @@ public int hashCode() { .append(policyStatus) .append(parallelismHint) .append(alertDefinition) + .append(deduplication) .build(); } @@ -172,7 +174,8 @@ public boolean equals(Object that) { && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) && another.policyStatus.equals(this.policyStatus) && another.parallelismHint == this.parallelismHint - && Objects.equals(another.alertDefinition, alertDefinition)) { + && Objects.equals(another.alertDefinition, alertDefinition) + && Objects.equals(another.deduplication, deduplication)) { return true; } return false; @@ -202,6 +205,14 @@ public void setSiteId(String siteId) { this.siteId = siteId; } + public AlertDeduplication getDeduplication() { + return deduplication; + } + + public void setDeduplication(AlertDeduplication deduplication) { + this.deduplication = deduplication; + } + @JsonIgnoreProperties(ignoreUnknown = true) public static class Definition implements Serializable { private static final long serialVersionUID = -622366527887848346L; diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java index 1bcac17f0a..a57941e43f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java @@ -139,7 +139,9 @@ private Map buildAlertContext(AlertStreamEvent event) { alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, generateAlertDataDesc(event)); alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_CATEGORY, event.getCategory()); alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, event.getSeverity().toString()); - alertContext.put(PublishConstants.ALERT_EMAIL_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime())); + alertContext.put(PublishConstants.ALERT_EMAIL_TIME, String.format("%s %s", + DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()), + DateTimeUtil.CURRENT_TIME_ZONE.getID())); alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, event.getStreamId()); alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy()); alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, Version.version); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index b155bb887f..c5c9e040da 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -29,6 +29,7 @@ import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator; import org.slf4j.Logger; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,8 +72,14 @@ public void init(Config config, Publishment publishment, Map conf) throws Except getLogger().error(String.format("initialize extended deduplicator %s failed", spec.getClassName()), t); } } else { - this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(), - publishment.getDedupFields(), publishment.getDedupStateField(), publishment.getDedupStateCloseValue(), dedupCache); + if (publishment.getDedupIntervalMin() != null && !publishment.getDedupIntervalMin().isEmpty()) { + this.deduplicator = new DefaultDeduplicator( + publishment.getDedupIntervalMin(), + publishment.getDedupFields(), + publishment.getDedupStateField(), + publishment.getDedupStateCloseValue(), + dedupCache); + } this.pubName = publishment.getName(); } String serializerClz = publishment.getSerializer(); @@ -98,7 +105,11 @@ public void update(String dedupIntervalMin, Map pluginProperties @Override public List dedup(AlertStreamEvent event) { - return deduplicator.dedup(event); + if (null != deduplicator) { + return deduplicator.dedup(event); + } else { + return Collections.singletonList(event); + } } @Override diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java index 152a9f1a44..f40680cfdd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java @@ -40,7 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*; import static org.apache.eagle.common.mail.AlertEmailConstants.*; public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { @@ -215,7 +214,6 @@ public PublishmentType getPluginType() { .name("Email") .type(AlertEmailPublisher.class) .description("Email alert publisher") - .field("subject") .field("sender") .field("recipients") .build(); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index 5b902f95a1..e38799f462 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -43,7 +43,10 @@ public class AlertPublisherImpl implements AlertPublisher { private final String name; - private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + // + private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + //private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + private Config config; private Map conf; @@ -73,11 +76,11 @@ public void nextEvent(PublishPartition partition, AlertStreamEvent event) { private void notifyAlert(PublishPartition partition, AlertStreamEvent event) { // remove the column values for publish plugin match partition.getColumnValues().clear(); - if (!publishPluginMapping.containsKey(partition)) { + if (!publishPluginMapping.containsKey(partition.getPublishId())) { LOG.warn("PublishPartition {} is not found in publish plugin map", partition); return; } - AlertPublishPlugin plugin = publishPluginMapping.get(partition); + AlertPublishPlugin plugin = publishPluginMapping.get(partition.getPublishId()); if (plugin == null) { LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition); return; @@ -120,7 +123,7 @@ public synchronized void onPublishChange(List added, } // copy and swap to avoid concurrency issue - Map newPublishMap = new HashMap<>(publishPluginMapping); + Map newPublishMap = new HashMap<>(publishPluginMapping); // added for (Publishment publishment : added) { @@ -128,9 +131,7 @@ public synchronized void onPublishChange(List added, AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (plugin != null) { - for (PublishPartition p : getPublishPartitions(publishment)) { - newPublishMap.put(p, plugin); - } + newPublishMap.put(publishment.getName(), plugin); } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -138,16 +139,9 @@ public synchronized void onPublishChange(List added, //removed List toBeClosed = new ArrayList<>(); for (Publishment publishment : removed) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.remove(p); - } else { - newPublishMap.remove(p); - } - } - if (plugin != null) { - toBeClosed.add(plugin); + AlertPublishPlugin publishPlugin = newPublishMap.remove(publishment.getName()); + if (publishPlugin != null) { + toBeClosed.add(publishPlugin); } } // updated @@ -155,16 +149,11 @@ public synchronized void onPublishChange(List added, // for updated publishment, need to init them too AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (newPlugin != null) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.get(p); - } - newPublishMap.put(p, newPlugin); - } - if (plugin != null) { - toBeClosed.add(plugin); + AlertPublishPlugin oldPlugin = newPublishMap.get(publishment.getName()); + if (oldPlugin != null) { + toBeClosed.add(oldPlugin); } + newPublishMap.put(publishment.getName(), newPlugin); } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -199,7 +188,7 @@ private void closePlugins(List toBeClosed) { try { p.close(); } catch (Exception e) { - LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e); + LOG.error("Error when close publish plugin {}!", p.getClass().getCanonicalName(), e); } } } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java index ac99db3780..54d551e685 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java @@ -20,6 +20,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.engine.coordinator.AlertDeduplication; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; @@ -59,6 +60,13 @@ public DefaultDeduplicator(long intervalMin) { this.dedupIntervalSec = intervalMin; } + public DefaultDeduplicator(AlertDeduplication alertDeduplication) { + this.customDedupFields = alertDeduplication.getDedupFields(); + this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60; + this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( + this.dedupIntervalSec, TimeUnit.SECONDS).build(); + } + public DefaultDeduplicator(String intervalMin, List customDedupFields, String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) { setDedupIntervalMin(intervalMin); @@ -81,7 +89,7 @@ public DefaultDeduplicator(String intervalMin, List customDedupFields, * @param key * @return */ - public List checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { + private List checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { if (StringUtils.isBlank(stateFiledValue)) { // without state field, we cannot determine whether it is duplicated // without custom filed values, we cannot determine whether it is duplicated diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index 44a5fe91db..ce46e9c329 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -33,6 +33,7 @@ import org.apache.eagle.alert.engine.publisher.AlertStreamFilter; import org.apache.eagle.alert.engine.publisher.PipeStreamFilter; import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; +import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider; import org.apache.eagle.alert.utils.AlertConstants; @@ -43,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener { @@ -51,6 +53,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli private volatile Map cachedPublishments = new HashMap<>(); private volatile Map policyDefinitionMap; private volatile Map streamDefinitionMap; + private volatile Map deduplicatorMap = new ConcurrentHashMap<>(); private AlertTemplateEngine alertTemplateEngine; private boolean logEventEnabled; @@ -87,6 +90,12 @@ public void execute(Tuple input) { if (logEventEnabled) { LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event); } + if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) { + List eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event); + if (eventList == null || eventList.isEmpty()) { + return; + } + } AlertStreamEvent filteredEvent = alertFilter.filter(event); if (filteredEvent != null) { alertPublisher.nextEvent(partition, filteredEvent); @@ -139,7 +148,7 @@ public synchronized void onAlertPublishSpecChange(PublishSpec pubSpec, Map pds, Map sds) { + public synchronized void onAlertPolicyChange(Map pds, Map sds) { List policyToRemove = new ArrayList<>(); if (this.policyDefinitionMap != null) { policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList())); @@ -151,6 +160,9 @@ public void onAlertPolicyChange(Map pds, Map entry : pds.entrySet()) { try { this.alertTemplateEngine.register(entry.getValue()); + if (entry.getValue().getDeduplication() != null) { + this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication())); + } } catch (Throwable throwable) { LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable); } @@ -159,6 +171,9 @@ public void onAlertPolicyChange(Map pds, Map confKeyPatterns = new ArrayList<>(); + confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB); + confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB); + confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB); + confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB); + for (String key : confKeyPatterns) { + builder.includeJobKeyPatterns(Pattern.compile(key)); + } + JobHistoryContentFilter filter = builder.build(); + + MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(ConfigFactory.load()); + Map tags = new HashMap<>(); + tags.put("site", "sandbox"); + tags.put("jobId", "job_1490593856016_152289"); + tags.put("jobType", "HIVE"); + tags.put("jobDefId", "INSERT OVERWRITE TABLE kyl...'2017-04-06')))(Stage-1)"); + JHFMRVer2EventReader reader = new JHFMRVer2EventReader(tags, conf, filter, appConfig); + reader.addListener(new JobConfigurationCreationServiceListener(appConfig.getEagleServiceConfig()) { + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + Assert.assertTrue(null != entity); + Assert.assertTrue(entity instanceof JobConfigurationAPIEntity); + JobConfigurationAPIEntity configurationAPIEntity = (JobConfigurationAPIEntity) entity; + Assert.assertTrue(configurationAPIEntity.getJobConfig().getConfig().size() == 1); + } + }); + reader.parseConfiguration(); + } +} diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf new file mode 100644 index 0000000000..00b14a85ac --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "appId":"mrHistoryJob", + "mode":"LOCAL", + "workers" : 3, + "siteId" : "sandbox", + application.storm.nimbusHost=localhost + + "stormConfig" : { + "mrHistoryJobSpoutTasks" : 6, + "jobKafkaSinkTasks" : 1, + "taskAttemptKafkaSinkTasks" : 1 + }, + + "zookeeper" : { + "zkQuorum" : "sandbox.hortonworks.com:2181", + "zkRoot" : "/test_mrjobhistory", + "zkSessionTimeoutMs" : 15000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 20000 + }, + + "endpointConfig" : { + "timeZone" : "UTC", + "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888", + "basePath" : "/mr-history/done", + "hdfs" : { + fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020", + #if not need, then do not set + # hdfs.kerberos.principal = , + # hdfs.keytab.file = + # .... + } + }, + + "service": { + "host": "localhost", + "port": 9090, + "username": "admin", + "password": "secret", + "readTimeOutSeconds" : 10, + context = "/rest" + }, + + "dataSinkConfig": { + "topic" : "map_reduce_failed_job", + "brokerList" : "sandbox.hortonworks.com:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + "producerType" : "async", + "numBatchMessages" : "4096", + "maxQueueBufferMs" : "5000", + "requestRequiredAcks" : "0" + }, + + "MRConfigureKeys" : { + "jobNameKey" : "eagle.job.name", + "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts" + } +} \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml index 6d22996727..dfccf8e5d3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml @@ -1,18 +1,164 @@ - + + hive.optimize.skewjoin.compiletime + false + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.query.string + + select a.phone_number from customer_details a, call_detail_records b where a.phone_number=b.phone_number + + programatically + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + dfs.blockreport.initialDelay + 120 + hdfs-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.reduce.markreset.buffer.percent + 0.0 + mapred-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + file.client-write-packet-size + 65536 + core-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + + hadoop.http.authentication.simple.anonymous.allowed + + true + core-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.querylog.location + /tmp/hive + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + yarn.timeline-service.leveldb-timeline-store.path + /hadoop/yarn/timeline + yarn-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + yarn.resourcemanager.proxy-user-privileges.enabled + false + yarn-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.exec.script.allow.partial.consumption + false + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.server2.global.init.file.location + /etc/hive/conf + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + + yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms + + 10000 + yarn-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + dfs.datanode.slow.io.warning.threshold.ms + 300 + hdfs-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.support.concurrency + true + file:/etc/hive/conf/hive-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.reduce.shuffle.merge.percent + 0.66 + mapred-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.task.skip.start.attempts + 2 + mapred-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + \ No newline at end of file From fde1d5de40b881ca72b31603f8350804f936eb09 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 6 Apr 2017 19:34:09 +0800 Subject: [PATCH 07/14] refine publishmentType --- eagle-assembly/src/main/doc/metadata-ddl.sql | 7 ------- .../coordinator/DeduplicationDefinition.java | 7 +++++++ .../engine/coordinator/PublishmentType.java | 21 ++++++++----------- 3 files changed, 16 insertions(+), 19 deletions(-) create mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java diff --git a/eagle-assembly/src/main/doc/metadata-ddl.sql b/eagle-assembly/src/main/doc/metadata-ddl.sql index 0334623e26..3312576fac 100644 --- a/eagle-assembly/src/main/doc/metadata-ddl.sql +++ b/eagle-assembly/src/main/doc/metadata-ddl.sql @@ -164,10 +164,3 @@ CREATE TABLE IF NOT EXISTS analysis_email ( modifiedtime bigint(20) DEFAULT NULL, UNIQUE (siteId, userId) ); - -INSERT INTO publishment_type(id, content) VALUES -('Kafka', '{"name":"Kafka","type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher","description":null,"fields":[{"name":"kafka_broker","value":"sandbox.hortonworks.com:6667"},{"name":"topic"}]}'), -('Email', '{"name":"Email","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher","description":null,"fields":[{"name":"subject"},{"name":"sender"}, {"name":"recipients"}]}'), -('Slack', '{"name":"Slack","type":"org.apache.eagle.alert.engine.publisher.impl.AlertSlackPublisher","description":null,"fields":[{"name":"token"},{"name":"channels"}, {"name":"severitys"}, {"name":"urltemplate"}]}'), -('HBaseStorage', '{"name":"HBaseStorage","type":"org.apache.eagle.alert.app.AlertEagleStorePlugin","description":null,"fields":[]}'), -('JDBCStorage', '{"name":"JDBCStorage","type":"org.apache.eagle.alert.engine.publisher.impl.AlertEagleStorePlugin","description":null,"fields":[]}'); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java new file mode 100644 index 0000000000..1eb7952903 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java @@ -0,0 +1,7 @@ +package org.apache.eagle.alert.engine.coordinator; + +/** + * Created by qingwzhao on 4/6/17. + */ +public class DeduplicationDefinition { +} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java index f7025f26a4..3119ee635f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PublishmentType.java @@ -26,17 +26,6 @@ @JsonIgnoreProperties(ignoreUnknown = true) public class PublishmentType { private String name; - - @Override - public String toString() { - return "PublishmentType{" - + "name='" + name + '\'' - + ", type='" + type + '\'' - + ", description='" + description + '\'' - + ", fields=" + fields - + '}'; - } - private String type; private String description; private List> fields = new LinkedList<>(); @@ -73,7 +62,15 @@ public void setFields(List> fields) { this.fields = fields; } - + @Override + public String toString() { + return "PublishmentType{" + + "name='" + name + '\'' + + ", type='" + type + '\'' + + ", description='" + description + '\'' + + ", fields=" + fields + + '}'; + } @Override public boolean equals(Object obj) { From 784a8c97785a5d4b470216e4b638a46cbbfabb42 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 6 Apr 2017 19:35:23 +0800 Subject: [PATCH 08/14] update policy deletion --- .../metadata/impl/JdbcMetadataDaoImpl.java | 2 +- .../metadata/impl/JdbcMetadataHandler.java | 30 +++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java index e0b5c9dff3..6427d8cfe0 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataDaoImpl.java @@ -227,7 +227,7 @@ public OpResult removeDataSource(String datasourceId) { @Override public OpResult removePolicy(String policyId) { - return handler.removeById(PolicyDefinition.class.getSimpleName(), policyId); + return handler.removePolicyById(PolicyDefinition.class.getSimpleName(), policyId); } @Override diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java index a9e3c5e400..7fffa55c56 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/JdbcMetadataHandler.java @@ -455,6 +455,36 @@ public OpResult addPublishmentsToPolicy(String policyId, List publishmen return result; } + public OpResult removePolicyById(String clzName, String policyId) { + Connection connection = null; + PreparedStatement statement = null; + OpResult result = new OpResult(); + try { + String tb = getTableName(clzName); + connection = dataSource.getConnection(); + connection.setAutoCommit(false); + statement = connection.prepareStatement(String.format(DELETE_STATEMENT, tb)); + statement.setString(1, policyId); + int status = statement.executeUpdate(); + LOG.info("delete {} policy {} from {}", status, policyId, tb); + closeResource(null, statement, null); + + statement = connection.prepareStatement(DELETE_PUBLISHMENT_STATEMENT); + statement.setString(1, policyId); + status = statement.executeUpdate(); + LOG.info("delete {} records from policy_publishment", status); + + connection.commit(); + connection.setAutoCommit(true); + } catch (SQLException e) { + e.printStackTrace(); + } finally { + closeResource(null, statement, connection); + } + LOG.info(result.message); + return result; + } + public OpResult removeById(String clzName, String key) { Connection connection = null; PreparedStatement statement = null; From 8572b9e3695d1aa520368ba80bf150235751e33e Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 7 Apr 2017 21:50:01 +0800 Subject: [PATCH 09/14] add AlertDeduplication --- .../coordinator/AlertDeduplication.java | 71 +++++++ .../coordinator/DeduplicationDefinition.java | 7 - .../engine/coordinator/PolicyDefinition.java | 13 +- .../publisher/email/AlertEmailGenerator.java | 4 +- .../publisher/impl/AbstractPublishPlugin.java | 17 +- .../publisher/impl/AlertEmailPublisher.java | 2 - .../publisher/impl/AlertPublisherImpl.java | 43 ++--- .../publisher/impl/DefaultDeduplicator.java | 10 +- .../engine/runner/AlertPublisherBolt.java | 17 +- .../mr/history/JHFEventReaderBaseTest.java | 74 ++++++++ .../src/test/resources/application.conf | 74 ++++++++ .../job_1479206441898_508949_conf.xml | 178 ++++++++++++++++-- 12 files changed, 451 insertions(+), 59 deletions(-) create mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java delete mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java create mode 100644 eagle-jpm/eagle-jpm-mr-history/src/test/java/org/apache/eagle/jpm/mr/history/JHFEventReaderBaseTest.java create mode 100644 eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java new file mode 100644 index 0000000000..78fef7aed3 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDeduplication.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.eagle.alert.engine.coordinator; + +import org.apache.commons.collections.ListUtils; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +import java.util.List; +import java.util.Objects; + +public class AlertDeduplication { + private String dedupIntervalMin; + private List dedupFields; + + public String getDedupIntervalMin() { + return dedupIntervalMin; + } + + public void setDedupIntervalMin(String dedupIntervalMin) { + this.dedupIntervalMin = dedupIntervalMin; + } + + public List getDedupFields() { + return dedupFields; + } + + public void setDedupFields(List dedupFields) { + this.dedupFields = dedupFields; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(dedupFields) + .append(dedupIntervalMin) + .build(); + } + + @Override + public boolean equals(Object that) { + if (that == this) { + return true; + } + if (!(that instanceof AlertDeduplication)) { + return false; + } + AlertDeduplication another = (AlertDeduplication) that; + if (ListUtils.isEqualList(another.dedupFields, this.dedupFields) + && Objects.equals(another.dedupIntervalMin, this.dedupIntervalMin)) { + return true; + } + return false; + } + + +} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java deleted file mode 100644 index 1eb7952903..0000000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.eagle.alert.engine.coordinator; - -/** - * Created by qingwzhao on 4/6/17. - */ -public class DeduplicationDefinition { -} diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java index c377e41d43..5004513a41 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java @@ -43,6 +43,7 @@ public class PolicyDefinition implements Serializable { private Definition stateDefinition; private PolicyStatus policyStatus = PolicyStatus.ENABLED; private AlertDefinition alertDefinition; + private AlertDeduplication deduplication; // one stream only have one partition in one policy, since we don't support stream alias private List partitionSpec = new ArrayList(); @@ -147,6 +148,7 @@ public int hashCode() { .append(policyStatus) .append(parallelismHint) .append(alertDefinition) + .append(deduplication) .build(); } @@ -172,7 +174,8 @@ public boolean equals(Object that) { && CollectionUtils.isEqualCollection(another.partitionSpec, this.partitionSpec) && another.policyStatus.equals(this.policyStatus) && another.parallelismHint == this.parallelismHint - && Objects.equals(another.alertDefinition, alertDefinition)) { + && Objects.equals(another.alertDefinition, alertDefinition) + && Objects.equals(another.deduplication, deduplication)) { return true; } return false; @@ -202,6 +205,14 @@ public void setSiteId(String siteId) { this.siteId = siteId; } + public AlertDeduplication getDeduplication() { + return deduplication; + } + + public void setDeduplication(AlertDeduplication deduplication) { + this.deduplication = deduplication; + } + @JsonIgnoreProperties(ignoreUnknown = true) public static class Definition implements Serializable { private static final long serialVersionUID = -622366527887848346L; diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java index 1bcac17f0a..a57941e43f 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java @@ -139,7 +139,9 @@ private Map buildAlertContext(AlertStreamEvent event) { alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, generateAlertDataDesc(event)); alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_CATEGORY, event.getCategory()); alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, event.getSeverity().toString()); - alertContext.put(PublishConstants.ALERT_EMAIL_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime())); + alertContext.put(PublishConstants.ALERT_EMAIL_TIME, String.format("%s %s", + DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()), + DateTimeUtil.CURRENT_TIME_ZONE.getID())); alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, event.getStreamId()); alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy()); alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, Version.version); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java index b155bb887f..c5c9e040da 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AbstractPublishPlugin.java @@ -29,6 +29,7 @@ import org.apache.eagle.alert.engine.publisher.dedup.ExtendedDeduplicator; import org.slf4j.Logger; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -71,8 +72,14 @@ public void init(Config config, Publishment publishment, Map conf) throws Except getLogger().error(String.format("initialize extended deduplicator %s failed", spec.getClassName()), t); } } else { - this.deduplicator = new DefaultDeduplicator(publishment.getDedupIntervalMin(), - publishment.getDedupFields(), publishment.getDedupStateField(), publishment.getDedupStateCloseValue(), dedupCache); + if (publishment.getDedupIntervalMin() != null && !publishment.getDedupIntervalMin().isEmpty()) { + this.deduplicator = new DefaultDeduplicator( + publishment.getDedupIntervalMin(), + publishment.getDedupFields(), + publishment.getDedupStateField(), + publishment.getDedupStateCloseValue(), + dedupCache); + } this.pubName = publishment.getName(); } String serializerClz = publishment.getSerializer(); @@ -98,7 +105,11 @@ public void update(String dedupIntervalMin, Map pluginProperties @Override public List dedup(AlertStreamEvent event) { - return deduplicator.dedup(event); + if (null != deduplicator) { + return deduplicator.dedup(event); + } else { + return Collections.singletonList(event); + } } @Override diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java index 152a9f1a44..f40680cfdd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java @@ -40,7 +40,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.apache.eagle.alert.service.MetadataServiceClientImpl.*; import static org.apache.eagle.common.mail.AlertEmailConstants.*; public class AlertEmailPublisher extends AbstractPublishPlugin implements AlertPublishPluginProvider { @@ -215,7 +214,6 @@ public PublishmentType getPluginType() { .name("Email") .type(AlertEmailPublisher.class) .description("Email alert publisher") - .field("subject") .field("sender") .field("recipients") .build(); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java index 5b902f95a1..e38799f462 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java @@ -43,7 +43,10 @@ public class AlertPublisherImpl implements AlertPublisher { private final String name; - private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + // + private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + //private volatile Map publishPluginMapping = new ConcurrentHashMap<>(1); + private Config config; private Map conf; @@ -73,11 +76,11 @@ public void nextEvent(PublishPartition partition, AlertStreamEvent event) { private void notifyAlert(PublishPartition partition, AlertStreamEvent event) { // remove the column values for publish plugin match partition.getColumnValues().clear(); - if (!publishPluginMapping.containsKey(partition)) { + if (!publishPluginMapping.containsKey(partition.getPublishId())) { LOG.warn("PublishPartition {} is not found in publish plugin map", partition); return; } - AlertPublishPlugin plugin = publishPluginMapping.get(partition); + AlertPublishPlugin plugin = publishPluginMapping.get(partition.getPublishId()); if (plugin == null) { LOG.warn("PublishPartition {} has problems while initializing publish plugin", partition); return; @@ -120,7 +123,7 @@ public synchronized void onPublishChange(List added, } // copy and swap to avoid concurrency issue - Map newPublishMap = new HashMap<>(publishPluginMapping); + Map newPublishMap = new HashMap<>(publishPluginMapping); // added for (Publishment publishment : added) { @@ -128,9 +131,7 @@ public synchronized void onPublishChange(List added, AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (plugin != null) { - for (PublishPartition p : getPublishPartitions(publishment)) { - newPublishMap.put(p, plugin); - } + newPublishMap.put(publishment.getName(), plugin); } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -138,16 +139,9 @@ public synchronized void onPublishChange(List added, //removed List toBeClosed = new ArrayList<>(); for (Publishment publishment : removed) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.remove(p); - } else { - newPublishMap.remove(p); - } - } - if (plugin != null) { - toBeClosed.add(plugin); + AlertPublishPlugin publishPlugin = newPublishMap.remove(publishment.getName()); + if (publishPlugin != null) { + toBeClosed.add(publishPlugin); } } // updated @@ -155,16 +149,11 @@ public synchronized void onPublishChange(List added, // for updated publishment, need to init them too AlertPublishPlugin newPlugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf); if (newPlugin != null) { - AlertPublishPlugin plugin = null; - for (PublishPartition p : getPublishPartitions(publishment)) { - if (plugin == null) { - plugin = newPublishMap.get(p); - } - newPublishMap.put(p, newPlugin); - } - if (plugin != null) { - toBeClosed.add(plugin); + AlertPublishPlugin oldPlugin = newPublishMap.get(publishment.getName()); + if (oldPlugin != null) { + toBeClosed.add(oldPlugin); } + newPublishMap.put(publishment.getName(), newPlugin); } else { LOG.error("OnPublishChange alertPublisher {} failed due to invalid format", publishment); } @@ -199,7 +188,7 @@ private void closePlugins(List toBeClosed) { try { p.close(); } catch (Exception e) { - LOG.error(String.format("Error when close publish plugin {}!", p.getClass().getCanonicalName()), e); + LOG.error("Error when close publish plugin {}!", p.getClass().getCanonicalName(), e); } } } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java index ac99db3780..54d551e685 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/DefaultDeduplicator.java @@ -20,6 +20,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.commons.lang3.StringUtils; +import org.apache.eagle.alert.engine.coordinator.AlertDeduplication; import org.apache.eagle.alert.engine.coordinator.StreamDefinition; import org.apache.eagle.alert.engine.model.AlertStreamEvent; import org.apache.eagle.alert.engine.publisher.AlertDeduplicator; @@ -59,6 +60,13 @@ public DefaultDeduplicator(long intervalMin) { this.dedupIntervalSec = intervalMin; } + public DefaultDeduplicator(AlertDeduplication alertDeduplication) { + this.customDedupFields = alertDeduplication.getDedupFields(); + this.dedupIntervalSec = Integer.parseInt(alertDeduplication.getDedupIntervalMin()) * 60; + this.withoutStatesCache = CacheBuilder.newBuilder().expireAfterWrite( + this.dedupIntervalSec, TimeUnit.SECONDS).build(); + } + public DefaultDeduplicator(String intervalMin, List customDedupFields, String dedupStateField, String dedupStateCloseValue, DedupCache dedupCache) { setDedupIntervalMin(intervalMin); @@ -81,7 +89,7 @@ public DefaultDeduplicator(String intervalMin, List customDedupFields, * @param key * @return */ - public List checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { + private List checkDedup(AlertStreamEvent event, EventUniq key, String stateFiledValue) { if (StringUtils.isBlank(stateFiledValue)) { // without state field, we cannot determine whether it is duplicated // without custom filed values, we cannot determine whether it is duplicated diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index 44a5fe91db..ce46e9c329 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -33,6 +33,7 @@ import org.apache.eagle.alert.engine.publisher.AlertStreamFilter; import org.apache.eagle.alert.engine.publisher.PipeStreamFilter; import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl; +import org.apache.eagle.alert.engine.publisher.impl.DefaultDeduplicator; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine; import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider; import org.apache.eagle.alert.utils.AlertConstants; @@ -43,6 +44,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener { @@ -51,6 +53,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli private volatile Map cachedPublishments = new HashMap<>(); private volatile Map policyDefinitionMap; private volatile Map streamDefinitionMap; + private volatile Map deduplicatorMap = new ConcurrentHashMap<>(); private AlertTemplateEngine alertTemplateEngine; private boolean logEventEnabled; @@ -87,6 +90,12 @@ public void execute(Tuple input) { if (logEventEnabled) { LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event); } + if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) { + List eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event); + if (eventList == null || eventList.isEmpty()) { + return; + } + } AlertStreamEvent filteredEvent = alertFilter.filter(event); if (filteredEvent != null) { alertPublisher.nextEvent(partition, filteredEvent); @@ -139,7 +148,7 @@ public synchronized void onAlertPublishSpecChange(PublishSpec pubSpec, Map pds, Map sds) { + public synchronized void onAlertPolicyChange(Map pds, Map sds) { List policyToRemove = new ArrayList<>(); if (this.policyDefinitionMap != null) { policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList())); @@ -151,6 +160,9 @@ public void onAlertPolicyChange(Map pds, Map entry : pds.entrySet()) { try { this.alertTemplateEngine.register(entry.getValue()); + if (entry.getValue().getDeduplication() != null) { + this.deduplicatorMap.put(entry.getKey(), new DefaultDeduplicator(entry.getValue().getDeduplication())); + } } catch (Throwable throwable) { LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable); } @@ -159,6 +171,9 @@ public void onAlertPolicyChange(Map pds, Map confKeyPatterns = new ArrayList<>(); + confKeyPatterns.add(Constants.JobConfiguration.CASCADING_JOB); + confKeyPatterns.add(Constants.JobConfiguration.HIVE_JOB); + confKeyPatterns.add(Constants.JobConfiguration.PIG_JOB); + confKeyPatterns.add(Constants.JobConfiguration.SCOOBI_JOB); + for (String key : confKeyPatterns) { + builder.includeJobKeyPatterns(Pattern.compile(key)); + } + JobHistoryContentFilter filter = builder.build(); + + MRHistoryJobConfig appConfig = MRHistoryJobConfig.newInstance(ConfigFactory.load()); + Map tags = new HashMap<>(); + tags.put("site", "sandbox"); + tags.put("jobId", "job_1490593856016_152289"); + tags.put("jobType", "HIVE"); + tags.put("jobDefId", "INSERT OVERWRITE TABLE kyl...'2017-04-06')))(Stage-1)"); + JHFMRVer2EventReader reader = new JHFMRVer2EventReader(tags, conf, filter, appConfig); + reader.addListener(new JobConfigurationCreationServiceListener(appConfig.getEagleServiceConfig()) { + @Override + public void jobEntityCreated(JobBaseAPIEntity entity) throws Exception { + Assert.assertTrue(null != entity); + Assert.assertTrue(entity instanceof JobConfigurationAPIEntity); + JobConfigurationAPIEntity configurationAPIEntity = (JobConfigurationAPIEntity) entity; + Assert.assertTrue(configurationAPIEntity.getJobConfig().getConfig().size() == 1); + } + }); + reader.parseConfiguration(); + } +} diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf new file mode 100644 index 0000000000..00b14a85ac --- /dev/null +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/application.conf @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +{ + "appId":"mrHistoryJob", + "mode":"LOCAL", + "workers" : 3, + "siteId" : "sandbox", + application.storm.nimbusHost=localhost + + "stormConfig" : { + "mrHistoryJobSpoutTasks" : 6, + "jobKafkaSinkTasks" : 1, + "taskAttemptKafkaSinkTasks" : 1 + }, + + "zookeeper" : { + "zkQuorum" : "sandbox.hortonworks.com:2181", + "zkRoot" : "/test_mrjobhistory", + "zkSessionTimeoutMs" : 15000, + "zkRetryTimes" : 3, + "zkRetryInterval" : 20000 + }, + + "endpointConfig" : { + "timeZone" : "UTC", + "mrHistoryServerUrl" : "http://sandbox.hortonworks.com:19888", + "basePath" : "/mr-history/done", + "hdfs" : { + fs.defaultFS : "hdfs://sandbox.hortonworks.com:8020", + #if not need, then do not set + # hdfs.kerberos.principal = , + # hdfs.keytab.file = + # .... + } + }, + + "service": { + "host": "localhost", + "port": 9090, + "username": "admin", + "password": "secret", + "readTimeOutSeconds" : 10, + context = "/rest" + }, + + "dataSinkConfig": { + "topic" : "map_reduce_failed_job", + "brokerList" : "sandbox.hortonworks.com:6667", + "serializerClass" : "kafka.serializer.StringEncoder", + "keySerializerClass" : "kafka.serializer.StringEncoder" + "producerType" : "async", + "numBatchMessages" : "4096", + "maxQueueBufferMs" : "5000", + "requestRequiredAcks" : "0" + }, + + "MRConfigureKeys" : { + "jobNameKey" : "eagle.job.name", + "jobConfigKey" : "mapreduce.map.output.compress,mapreduce.map.output.compress.codec,mapreduce.output.fileoutputformat.compress,mapreduce.output.fileoutputformat.compress.type,mapreduce.output.fileoutputformat.compress.codec,mapred.output.format.class, dataplatform.etl.info,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.java.opts,mapreduce.reduce.java.opts" + } +} \ No newline at end of file diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml index 6d22996727..dfccf8e5d3 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml @@ -1,18 +1,164 @@ - + + hive.optimize.skewjoin.compiletime + false + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.query.string + + select a.phone_number from customer_details a, call_detail_records b where a.phone_number=b.phone_number + + programatically + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + dfs.blockreport.initialDelay + 120 + hdfs-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.reduce.markreset.buffer.percent + 0.0 + mapred-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + file.client-write-packet-size + 65536 + core-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + + hadoop.http.authentication.simple.anonymous.allowed + + true + core-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.querylog.location + /tmp/hive + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + yarn.timeline-service.leveldb-timeline-store.path + /hadoop/yarn/timeline + yarn-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + yarn.resourcemanager.proxy-user-privileges.enabled + false + yarn-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.exec.script.allow.partial.consumption + false + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.server2.global.init.file.location + /etc/hive/conf + programatically + + org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@67236f24 + + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + + yarn.resourcemanager.work-preserving-recovery.scheduling-wait-ms + + 10000 + yarn-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + dfs.datanode.slow.io.warning.threshold.ms + 300 + hdfs-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + hive.support.concurrency + true + file:/etc/hive/conf/hive-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.reduce.shuffle.merge.percent + 0.66 + mapred-site.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + + + mapreduce.task.skip.start.attempts + 2 + mapred-default.xml + job.xml + + hdfs://sandbox.hortonworks.com:8020/mr-history/done/2017/04/07/000000/job_1491539903031_0002_conf.xml + + \ No newline at end of file From 453ff5b6b75d76278d8d2a28121891711a2039de Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 6 Apr 2017 19:34:09 +0800 Subject: [PATCH 10/14] refine publishmentType --- .../alert/engine/coordinator/DeduplicationDefinition.java | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java new file mode 100644 index 0000000000..1eb7952903 --- /dev/null +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java @@ -0,0 +1,7 @@ +package org.apache.eagle.alert.engine.coordinator; + +/** + * Created by qingwzhao on 4/6/17. + */ +public class DeduplicationDefinition { +} From 9919041e99908b26fc85f68548605c6862657dd6 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 7 Apr 2017 21:50:01 +0800 Subject: [PATCH 11/14] add AlertDeduplication --- .../alert/engine/coordinator/DeduplicationDefinition.java | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java deleted file mode 100644 index 1eb7952903..0000000000 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/DeduplicationDefinition.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.apache.eagle.alert.engine.coordinator; - -/** - * Created by qingwzhao on 4/6/17. - */ -public class DeduplicationDefinition { -} From 7c1b57d7d071c7fc20b61cc64a9379650ee5b039 Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 21 Apr 2017 11:28:04 +0800 Subject: [PATCH 12/14] add missing license --- .../src/assembly/alert-assembly.xml | 3 ++- .../template/VelocityAlertTemplateEngine.java | 8 ++++++-- .../src/main/resources/ALERT_INLINED_TEMPLATE.vm | 2 +- .../resources/job_1479206441898_508949_conf.xml | 15 ++++++++++++++- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml index 9f25ec0def..b361f99b62 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-assembly/src/assembly/alert-assembly.xml @@ -9,7 +9,8 @@ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations - under the License. --> + under the License. +--> diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java index 87a067f278..c0b765afae 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java @@ -151,9 +151,13 @@ private static VelocityContext buildAlertContext(PolicyDefinition policyDefiniti context.put(AlertContextFields.ALERT_ID, event.getAlertId()); context.put(AlertContextFields.CREATED_BY, event.getCreatedBy()); context.put(AlertContextFields.CREATED_TIMESTAMP, event.getCreatedTime()); - context.put(AlertContextFields.CREATED_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime())); + context.put(AlertContextFields.CREATED_TIME, String.format("%s %s", + DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()), + DateTimeUtil.CURRENT_TIME_ZONE.getID())); context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp()); - context.put(AlertContextFields.ALERT_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp())); + context.put(AlertContextFields.ALERT_TIME, String.format("%s %s", + DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()), + DateTimeUtil.CURRENT_TIME_ZONE.getID())); context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema()); context.put(AlertContextFields.ALERT_EVENT, event); diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm index 0e3d5feed8..d63a36dd95 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm @@ -145,7 +145,7 @@ - CATEGORY: #if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end TIME: $alert["alertTime"] + CATEGORY: #if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end EVENT TIME: $alert["alertTime"] diff --git a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml index dfccf8e5d3..b670a3ff28 100644 --- a/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml +++ b/eagle-jpm/eagle-jpm-mr-history/src/test/resources/job_1479206441898_508949_conf.xml @@ -1,4 +1,17 @@ - + + + hive.optimize.skewjoin.compiletime false From ca331653118be9b0317fa0d5235430cf573d8a7c Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Fri, 21 Apr 2017 11:34:17 +0800 Subject: [PATCH 13/14] update ALERT_INLINED_TEMPLATE.vm --- .../alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm index d63a36dd95..70013c3809 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm @@ -145,7 +145,7 @@ - CATEGORY: #if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end EVENT TIME: $alert["alertTime"] + CATEGORY: #if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end CREATE TIME: $alert["alertTime"] From a933ed07d7f1f041c40f8e71bde665c992fcf8af Mon Sep 17 00:00:00 2001 From: "Zhao, Qingwen" Date: Thu, 27 Apr 2017 22:11:05 +0800 Subject: [PATCH 14/14] fix VelocityAlertTemplateEngineTest.java --- .../apache/eagle/alert/engine/runner/AlertPublisherBolt.java | 1 + .../publisher/template/VelocityAlertTemplateEngineTest.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java index ce46e9c329..d6829d6caf 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java @@ -93,6 +93,7 @@ public void execute(Tuple input) { if (deduplicatorMap != null && deduplicatorMap.containsKey(event.getPolicyId())) { List eventList = deduplicatorMap.get(event.getPolicyId()).dedup(event); if (eventList == null || eventList.isEmpty()) { + collector.ack(input); return; } } diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java index 7b1d494a40..e5ec4741dd 100644 --- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java +++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java @@ -36,7 +36,7 @@ public void testVelocityAlertTemplate () { templateEngine.init(ConfigFactory.load()); templateEngine.register(mockPolicy("testPolicy")); AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicy")); - Assert.assertEquals("Alert (2016-11-30 07:31:15): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " + + Assert.assertEquals("Alert (2016-11-30 07:31:15 UTC): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " + "exceeding thread hold: 90%. (policy: testPolicy, description: Policy for monitoring cpu usage > 90%), " + "definition: from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " + "select site, metric, host, role, value insert into capacityUsageAlert", event.getBody());