Skip to content

Commit

Permalink
TE: Not sending repeated mail for merged anomaly (#456)
Browse files Browse the repository at this point in the history
  • Loading branch information
puneetjaiswal committed Aug 29, 2016
1 parent d37c378 commit c642a09
Show file tree
Hide file tree
Showing 11 changed files with 38 additions and 217 deletions.
Expand Up @@ -4,7 +4,6 @@

import com.linkedin.thirdeye.db.entity.AnomalyMergedResult;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URLEncoder;
Expand Down Expand Up @@ -104,7 +103,7 @@ private void run() throws Exception {
final String collectionAlias = ThirdEyeUtils.getAliasFromCollection(collection);

// Get the anomalies in that range
final List<AnomalyMergedResult> results = anomalyMergedResultDAO.getAllByTimeEmailId(windowStart.getMillis(), windowEnd.getMillis(), alertConfig.getId());
final List<AnomalyMergedResult> results = anomalyMergedResultDAO.getAllByTimeEmailIdAndNotifiedFalse(windowStart.getMillis(), windowEnd.getMillis(), alertConfig.getId());

if (results.isEmpty() && !alertConfig.getSendZeroAnomalyEmail()) {
LOG.info("Zero anomalies found, skipping sending email");
Expand Down Expand Up @@ -144,6 +143,7 @@ private void run() throws Exception {
}

sendAlertForAnomalies(collectionAlias, results, groupedResults, dimensionNames);
updateNotifiedStatus(results);
}

private void sendAlertForAnomalies(String collectionAlias, List<AnomalyMergedResult> results,
Expand All @@ -156,7 +156,7 @@ private void sendAlertForAnomalies(String collectionAlias, List<AnomalyMergedRes
// Render template - create email first so we can get embedded image string
HtmlEmail email = new HtmlEmail();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// File chartFile = null;
// File chartFile = null;
try (Writer out = new OutputStreamWriter(baos, CHARSET)) {
Configuration freemarkerConfig = new Configuration(Configuration.VERSION_2_3_21);
freemarkerConfig.setClassForTemplateLoading(getClass(), "/com/linkedin/thirdeye/detector/");
Expand Down Expand Up @@ -211,6 +211,13 @@ private void sendAlertForAnomalies(String collectionAlias, List<AnomalyMergedRes
LOG.info("Sent email with {} anomalies! {}", results.size(), alertConfig);
}

private void updateNotifiedStatus(List<AnomalyMergedResult> mergedResults) {
for (AnomalyMergedResult mergedResult : mergedResults) {
mergedResult.setNotified(true);
anomalyMergedResultDAO.update(mergedResult);
}
}

private class AssignedDimensionsMethod implements TemplateMethodModelEx {
private static final String UNASSIGNED_DIMENSION_VALUE = "*";
private static final String DIMENSION_VALUE_SEPARATOR = ",";
Expand Down
Expand Up @@ -7,7 +7,6 @@
import com.linkedin.thirdeye.db.dao.AnomalyResultDAO;
import com.linkedin.thirdeye.db.dao.AnomalyTaskDAO;
import com.linkedin.thirdeye.db.dao.EmailConfigurationDAO;
import com.linkedin.thirdeye.db.dao.AnomalyFunctionRelationDAO;
import com.linkedin.thirdeye.db.dao.WebappConfigDAO;

import io.dropwizard.Application;
Expand All @@ -25,7 +24,6 @@ public abstract class BaseThirdEyeApplication<T extends Configuration> extends A
protected EmailConfigurationDAO emailConfigurationDAO;
protected AnomalyJobDAO anomalyJobDAO;
protected AnomalyTaskDAO anomalyTaskDAO;
protected AnomalyFunctionRelationDAO anomalyFunctionRelationDAO;
protected WebappConfigDAO webappConfigDAO;
protected AnomalyMergedResultDAO anomalyMergedResultDAO;

Expand All @@ -36,7 +34,6 @@ public void initDAOs() {
anomalyFunctionDAO = PersistenceUtil.getInstance(AnomalyFunctionDAO.class);
anomalyResultDAO = PersistenceUtil.getInstance(AnomalyResultDAO.class);
emailConfigurationDAO = PersistenceUtil.getInstance(EmailConfigurationDAO.class);
anomalyFunctionRelationDAO = PersistenceUtil.getInstance(AnomalyFunctionRelationDAO.class);
anomalyJobDAO = PersistenceUtil.getInstance(AnomalyJobDAO.class);
anomalyTaskDAO = PersistenceUtil.getInstance(AnomalyTaskDAO.class);
webappConfigDAO = PersistenceUtil.getInstance(WebappConfigDAO.class);
Expand Down

This file was deleted.

Expand Up @@ -40,9 +40,9 @@ public class AnomalyMergedResultDAO extends AbstractJpaDAO<AnomalyMergedResult>
private static final String FIND_BY_TIME =
"from AnomalyMergedResult r WHERE (r.startTime < :endTime and r.endTime > :startTime) order by r.endTime desc ";

private static final String FIND_BY_TIME_EMAIL =
private static final String FIND_BY_TIME_EMAIL_NOTIFIED_FALSE =
"SELECT r FROM EmailConfiguration d JOIN d.functions f, AnomalyMergedResult r "
+ "WHERE r.function.id=f.id AND d.id = :emailId "
+ "WHERE r.function.id=f.id AND d.id = :emailId and r.notified=false "
+ "and (r.startTime < :endTime and r.endTime > :startTime) order by r.endTime desc ";

public AnomalyMergedResultDAO() {
Expand All @@ -56,8 +56,8 @@ public List<AnomalyMergedResult> getAllByTime(long startTime, long endTime) {
}

@Transactional
public List<AnomalyMergedResult> getAllByTimeEmailId(long startTime, long endTime, long emailId) {
return getEntityManager().createQuery(FIND_BY_TIME_EMAIL, entityClass)
public List<AnomalyMergedResult> getAllByTimeEmailIdAndNotifiedFalse(long startTime, long endTime, long emailId) {
return getEntityManager().createQuery(FIND_BY_TIME_EMAIL_NOTIFIED_FALSE, entityClass)
.setParameter("emailId", emailId).setParameter("startTime", startTime)
.setParameter("endTime", endTime).getResultList();
}
Expand Down

This file was deleted.

Expand Up @@ -50,6 +50,9 @@ public class AnomalyMergedResult extends AbstractBaseEntity implements Comparabl
@Column(name = "message")
private String message;

@Column(name = "notified")
private boolean notified;

@OneToOne(cascade = CascadeType.ALL, fetch = FetchType.EAGER, orphanRemoval = true)
@JoinColumn(name = "anomaly_feedback_id")
private AnomalyFeedback feedback;
Expand Down Expand Up @@ -127,6 +130,14 @@ public void setAnomalyResults(List<AnomalyResult> anomalyResults) {
this.anomalyResults = anomalyResults;
}

public boolean isNotified() {
return notified;
}

public void setNotified(boolean notified) {
this.notified = notified;
}

public String getCollection() {
return collection;
}
Expand Down
Expand Up @@ -36,7 +36,6 @@
import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;
import com.linkedin.thirdeye.detector.lib.util.JobUtils;
import com.linkedin.thirdeye.detector.resources.AnomalyDetectionJobResource;
import com.linkedin.thirdeye.detector.resources.AnomalyFunctionRelationResource;
import com.linkedin.thirdeye.detector.resources.AnomalyFunctionSpecResource;
import com.linkedin.thirdeye.detector.resources.AnomalyResultResource;
import com.linkedin.thirdeye.detector.resources.EmailFunctionDependencyResource;
Expand Down Expand Up @@ -112,9 +111,9 @@ public void stop() throws Exception {
new AnomalyFunctionFactory(config.getFunctionConfigPath());

// ThirdEye driver
final AnomalyDetectionJobManager anomalyDetectionJobManager = new AnomalyDetectionJobManager(quartzScheduler,
anomalyFunctionDAO, anomalyFunctionRelationDAO, anomalyResultDAO,
environment.metrics(), anomalyFunctionFactory, config.getFailureEmailConfig());
final AnomalyDetectionJobManager anomalyDetectionJobManager =
new AnomalyDetectionJobManager(quartzScheduler, anomalyFunctionDAO, anomalyResultDAO,
environment.metrics(), anomalyFunctionFactory, config.getFailureEmailConfig());

// Start all active jobs on startup
environment.lifecycle().manage(new Managed() {
Expand Down Expand Up @@ -216,7 +215,6 @@ public void serverStarted(Server server) {

// Jersey resources
environment.jersey().register(new AnomalyFunctionSpecResource(anomalyFunctionDAO));
environment.jersey().register(new AnomalyFunctionRelationResource(anomalyFunctionRelationDAO));
environment.jersey().register(new AnomalyResultResource(anomalyResultDAO));
environment.jersey().register(new MetricsGraphicsTimeSeriesResource(anomalyResultDAO));
environment.jersey().register(new AnomalyDetectionJobResource(anomalyDetectionJobManager, anomalyFunctionDAO));
Expand Down
Expand Up @@ -38,10 +38,8 @@
import com.linkedin.thirdeye.client.timeseries.TimeSeriesResponse;
import com.linkedin.thirdeye.client.timeseries.TimeSeriesResponseConverter;
import com.linkedin.thirdeye.dashboard.Utils;
import com.linkedin.thirdeye.db.entity.AnomalyFunctionRelation;
import com.linkedin.thirdeye.db.entity.AnomalyFunctionSpec;
import com.linkedin.thirdeye.db.entity.AnomalyResult;
import com.linkedin.thirdeye.db.dao.AnomalyFunctionRelationDAO;
import com.linkedin.thirdeye.detector.function.AnomalyFunction;
import com.linkedin.thirdeye.detector.lib.util.JobUtils;

Expand All @@ -54,13 +52,11 @@ public class AnomalyDetectionJob implements Job {
public static final String WINDOW_END = "WINDOW_END";
public static final String WINDOW_START = "WINDOW_START";
public static final String METRIC_REGISTRY = "METRIC_REGISTRY";
public static final String RELATION_DAO = "RELATION_DAO";

private AnomalyFunction anomalyFunction;
private TimeSeriesHandler timeSeriesHandler;
private TimeSeriesResponseConverter timeSeriesResponseConverter;
private AnomalyResultDAO resultDAO;
private AnomalyFunctionRelationDAO relationDAO;
private MetricRegistry metricRegistry;
private Histogram histogram;
private String collection;
Expand Down Expand Up @@ -111,8 +107,6 @@ private void run(JobExecutionContext context, AnomalyFunction anomalyFunction)
(TimeSeriesResponseConverter) context.getJobDetail().getJobDataMap()
.get(TIME_SERIES_RESPONSE_CONVERTER);
resultDAO = (AnomalyResultDAO) context.getJobDetail().getJobDataMap().get(RESULT_DAO);
relationDAO =
(AnomalyFunctionRelationDAO) context.getJobDetail().getJobDataMap().get(RELATION_DAO);
metricRegistry = (MetricRegistry) context.getJobDetail().getJobDataMap().get(METRIC_REGISTRY);
String windowEndProp = context.getJobDetail().getJobDataMap().getString(WINDOW_END);
String windowStartProp = context.getJobDetail().getJobDataMap().getString(WINDOW_START);
Expand Down Expand Up @@ -293,15 +287,6 @@ private List<AnomalyResult> getExistingAnomalies() {
results.addAll(resultDAO
.findAllByTimeAndFunctionId(windowStart.getMillis(), windowEnd.getMillis(),
anomalyFunction.getSpec().getId()));

// The ones for any related functions
List<AnomalyFunctionRelation> relations =
relationDAO.findByParent(anomalyFunction.getSpec().getId());
for (AnomalyFunctionRelation relation : relations) {
results.addAll(resultDAO
.findAllByTimeAndFunctionId(windowStart.getMillis(), windowEnd.getMillis(),
relation.getChildId()));
}
return results;
}

Expand Down
Expand Up @@ -29,7 +29,6 @@
import com.linkedin.thirdeye.client.timeseries.TimeSeriesHandler;
import com.linkedin.thirdeye.client.timeseries.TimeSeriesResponseConverter;
import com.linkedin.thirdeye.db.entity.AnomalyFunctionSpec;
import com.linkedin.thirdeye.db.dao.AnomalyFunctionRelationDAO;
import com.linkedin.thirdeye.detector.function.AnomalyFunction;
import com.linkedin.thirdeye.detector.function.AnomalyFunctionFactory;

Expand All @@ -41,7 +40,6 @@ public class AnomalyDetectionJobManager {
private final TimeSeriesHandler timeSeriesHandler;
private final TimeSeriesResponseConverter timeSeriesResponseConverter;
private final AnomalyFunctionDAO specDAO;
private final AnomalyFunctionRelationDAO relationDAO;
private final AnomalyResultDAO resultDAO;
private final Object sync;
private final Map<Long, String> scheduledJobKeys;
Expand All @@ -53,9 +51,8 @@ public class AnomalyDetectionJobManager {
private static final ObjectMapper reader = new ObjectMapper(new YAMLFactory());

public AnomalyDetectionJobManager(Scheduler quartzScheduler, AnomalyFunctionDAO specDAO,
AnomalyFunctionRelationDAO relationDAO, AnomalyResultDAO resultDAO,
MetricRegistry metricRegistry, AnomalyFunctionFactory anomalyFunctionFactory,
FailureEmailConfiguration failureEmailConfig) {
AnomalyResultDAO resultDAO, MetricRegistry metricRegistry,
AnomalyFunctionFactory anomalyFunctionFactory, FailureEmailConfiguration failureEmailConfig) {

this.queryCache = CACHE_REGISTRY_INSTANCE.getQueryCache();

Expand All @@ -64,7 +61,6 @@ public AnomalyDetectionJobManager(Scheduler quartzScheduler, AnomalyFunctionDAO

this.quartzScheduler = quartzScheduler;
this.specDAO = specDAO;
this.relationDAO = relationDAO;
this.resultDAO = resultDAO;
this.metricRegistry = metricRegistry;
this.sync = new Object();
Expand Down Expand Up @@ -121,7 +117,6 @@ private void buildAndScheduleJob(String jobKey, Trigger trigger, AnomalyFunction
job.getJobDataMap().put(AnomalyDetectionJob.WINDOW_END, windowEndIsoString);
job.getJobDataMap().put(AnomalyDetectionJob.RESULT_DAO, resultDAO);
job.getJobDataMap().put(AnomalyDetectionJob.METRIC_REGISTRY, metricRegistry);
job.getJobDataMap().put(AnomalyDetectionJob.RELATION_DAO, relationDAO);

job.getJobDataMap().put(FailureEmailConfiguration.FAILURE_EMAIL_CONFIG_KEY, failureEmailConfig);

Expand Down

0 comments on commit c642a09

Please sign in to comment.