Skip to content

Commit

Permalink
[TE] clean up somce code and disable sla alerts till we have the ramp…
Browse files Browse the repository at this point in the history
… feature (#5458)
  • Loading branch information
akshayrai committed May 28, 2020
1 parent 4988d72 commit b6cb44c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.pinot.thirdeye.anomaly.detection.trigger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -52,13 +50,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.thirdeye.detection.TaskUtils.*;


/**
* This class is to schedule detection tasks based on data availability events.
*/
public class DataAvailabilityTaskScheduler implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(DataAvailabilityTaskScheduler.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private ScheduledExecutorService executorService;
private long sleepPerRunInSec;
private long fallBackTimeInSec;
Expand Down Expand Up @@ -97,16 +96,17 @@ public void run() {
populateDetectionMapAndDatasetConfigMap(detection2DatasetMap, datasetConfigMap);
Map<Long, TaskDTO> runningDetection = retrieveRunningDetectionTasks();
int taskCount = 0;
long detectionEndTime = System.currentTimeMillis();
for (DetectionConfigDTO detectionConfig : detection2DatasetMap.keySet()) {
try {
long detectionConfigId = detectionConfig.getId();
DetectionPipelineTaskInfo taskInfo = TaskUtils.buildTaskInfoFromDetectionConfig(detectionConfig, detectionEndTime);
if (!runningDetection.containsKey(detectionConfigId)) {
if (isAllDatasetUpdated(detectionConfig, detection2DatasetMap.get(detectionConfig), datasetConfigMap)) {
if (isWithinSchedulingWindow(detection2DatasetMap.get(detectionConfig), datasetConfigMap)) {
//TODO: additional check is required if detection is based on aggregated value across multiple data points
long endtime = System.currentTimeMillis();
createDetectionTask(detectionConfig, endtime);
detectionIdToLastTaskEndTimeMap.put(detectionConfig.getId(), endtime);
createDetectionTask(taskInfo);
detectionIdToLastTaskEndTimeMap.put(detectionConfig.getId(), taskInfo.getEnd());
ThirdeyeMetricsUtil.eventScheduledTaskCounter.inc();
taskCount++;
} else {
Expand All @@ -119,16 +119,14 @@ public void run() {
// On the other hand, a user can setup an SLA alert if there is no data for 3 days.
if (needFallback(detectionConfig)) {
LOG.info("Scheduling a task for detection {} due to the fallback mechanism.", detectionConfigId);
long endtime = System.currentTimeMillis();
createDetectionTask(detectionConfig, endtime);

createDetectionTask(taskInfo);
if (DetectionUtils.isDataQualityCheckEnabled(detectionConfig)) {
createDataQualityTask(detectionConfig, endtime);
createDataQualityTask(taskInfo);
LOG.info("Scheduling a task for data sla check on detection config {} due to the fallback mechanism.",
detectionConfigId);
}

detectionIdToLastTaskEndTimeMap.put(detectionConfig.getId(), endtime);
detectionIdToLastTaskEndTimeMap.put(detectionConfig.getId(), taskInfo.getEnd());
ThirdeyeMetricsUtil.eventScheduledTaskFallbackCounter.inc();
taskCount++;
}
Expand Down Expand Up @@ -200,24 +198,6 @@ private Map<Long, TaskDTO> retrieveRunningDetectionTasks() {
return res;
}

private long createTask(TaskConstants.TaskType taskType, DetectionConfigDTO detectionConfig, long end)
throws JsonProcessingException {
DetectionPipelineTaskInfo taskInfo = TaskUtils.buildTaskInfoFromDetectionConfig(detectionConfig, end);
String taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
TaskDTO taskDTO = TaskUtils.buildTask(detectionConfig.getId(), taskInfoJson, taskType);
long id = taskDAO.save(taskDTO);
LOG.info("Created {} task {} with taskId {}", taskType, taskDTO, id);
return id;
}

private long createDetectionTask(DetectionConfigDTO detectionConfig, long end) throws JsonProcessingException {
return createTask(TaskConstants.TaskType.DETECTION, detectionConfig, end);
}

private long createDataQualityTask(DetectionConfigDTO detectionConfig, long end) throws JsonProcessingException {
return createTask(TaskConstants.TaskType.DATA_QUALITY, detectionConfig, end);
}

private void loadLatestTaskCreateTime(DetectionConfigDTO detectionConfig) throws Exception {
long detectionConfigId = detectionConfig.getId();
List<TaskDTO> tasks = taskDAO.findByNameOrderByCreateTime(TaskConstants.TaskType.DETECTION.toString() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.pinot.thirdeye.detection;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
Expand All @@ -32,13 +33,16 @@
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pinot.thirdeye.util.ThirdEyeUtils.getDetectionExpectedDelay;

/**
* Holds utility functions related to ThirdEye Tasks
*/
public class TaskUtils {
private static final Logger LOG = LoggerFactory.getLogger(TaskUtils.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/**
Expand Down Expand Up @@ -92,7 +96,32 @@ public static DetectionPipelineTaskInfo buildTaskInfo(JobExecutionContext jobExe

public static DetectionPipelineTaskInfo buildTaskInfoFromDetectionConfig(DetectionConfigDTO configDTO, long end) {
long delay = getDetectionExpectedDelay(configDTO);
long start = Math.max(configDTO.getLastTimestamp(), end - ThirdEyeUtils.DETECTION_TASK_MAX_LOOKBACK_WINDOW - delay);
long start = Math.max(configDTO.getLastTimestamp(), end - ThirdEyeUtils.DETECTION_TASK_MAX_LOOKBACK_WINDOW - delay);
return new DetectionPipelineTaskInfo(configDTO.getId(), start, end);
}

public static long createDetectionTask(DetectionPipelineTaskInfo taskInfo) {
return TaskUtils.createTask(TaskConstants.TaskType.DETECTION, taskInfo);
}

public static long createDataQualityTask(DetectionPipelineTaskInfo taskInfo) {
return TaskUtils.createTask(TaskConstants.TaskType.DATA_QUALITY, taskInfo);
}

/**
* Creates a generic task and saves it.
*/
public static long createTask(TaskConstants.TaskType taskType, DetectionPipelineTaskInfo taskInfo) {
String taskInfoJson = null;
try {
taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
} catch (JsonProcessingException e) {
LOG.error("Exception when converting DetectionPipelineTaskInfo {} to jsonString", taskInfo, e);
}

TaskDTO taskDTO = TaskUtils.buildTask(taskInfo.getConfigId(), taskInfoJson, taskType);
long id = DAORegistry.getInstance().getTaskDAO().save(taskDTO);
LOG.info("Created {} task {} with taskId {}", taskType, taskDTO, id);
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@
public class DataQualityPipelineJob implements Job {
private static final Logger LOG = LoggerFactory.getLogger(DataQualityPipelineJob.class);

private final TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long DATA_AVAILABILITY_TASK_TIMEOUT = TimeUnit.MINUTES.toMillis(15);

@Override
Expand All @@ -58,16 +55,7 @@ public void execute(JobExecutionContext jobExecutionContext) {
return;
}

String taskInfoJson = null;
try {
taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
} catch (JsonProcessingException e) {
LOG.error("Exception when converting DetectionPipelineTaskInfo {} to jsonString", taskInfo, e);
}

TaskDTO taskDTO = TaskUtils.buildTask(taskInfo.getConfigId(), taskInfoJson, TaskConstants.TaskType.DATA_QUALITY);
long taskId = taskDAO.save(taskDTO);
LOG.info("Created {} task {} with taskId {}", TaskConstants.TaskType.DATA_QUALITY, taskDTO, taskId);
TaskUtils.createDataQualityTask(taskInfo);
}
}

Expand Down

0 comments on commit b6cb44c

Please sign in to comment.