Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-9221] [alert-server] optimization and gracefully close #9246

Merged
merged 17 commits into from Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -41,8 +41,6 @@
import java.util.ServiceLoader;
import java.util.Set;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
Expand All @@ -55,23 +53,23 @@ public final class AlertPluginManager {

private final PluginDao pluginDao;

private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();

private final PluginParams warningTypeParams = getWarningTypeParams();

public AlertPluginManager(PluginDao pluginDao) {
this.pluginDao = pluginDao;
}

private final Map<Integer, AlertChannel> channelKeyedById = new HashMap<>();

private final PluginParams warningTypeParams = getWarningTypeParams();

public PluginParams getWarningTypeParams() {
return
RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
.addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
.setValue(WarningType.ALL.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE)
.addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false))
.addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false))
.setValue(WarningType.ALL.getDescp())
.addValidate(Validate.newBuilder().setRequired(true).build())
.build();
}

@EventListener
Expand Down
Expand Up @@ -36,10 +36,10 @@
public final class AlertRequestProcessor implements NettyRequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class);

private final AlertSender alertSender;
private final AlertSenderService alertSenderService;

public AlertRequestProcessor(AlertSender alertSender) {
this.alertSender = alertSender;
public AlertRequestProcessor(AlertSenderService alertSenderService) {
this.alertSenderService = alertSenderService;
}

@Override
Expand All @@ -51,7 +51,7 @@ public void process(Channel channel, Command command) {

logger.info("Received command : {}", alertSendRequestCommand);

AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(
AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(
alertSendRequestCommand.getGroupId(),
alertSendRequestCommand.getTitle(),
alertSendRequestCommand.getContent(),
Expand Down
Expand Up @@ -17,21 +17,26 @@

package org.apache.dolphinscheduler.alert;

import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.alert.api.AlertChannel;
import org.apache.dolphinscheduler.alert.api.AlertConstants;
import org.apache.dolphinscheduler.alert.api.AlertData;
import org.apache.dolphinscheduler.alert.api.AlertInfo;
import org.apache.dolphinscheduler.alert.api.AlertResult;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AlertStatus;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand;
import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult;

import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.HashSet;
Expand All @@ -40,22 +45,39 @@
import java.util.Optional;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
public final class AlertSender {
private static final Logger logger = LoggerFactory.getLogger(AlertSender.class);
@Service
public final class AlertSenderService extends Thread {
private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class);

private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;

public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager) {
public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) {
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
}

@Override
public synchronized void start() {
super.setName("AlertSenderService");
super.start();
}

@Override
public void run() {
logger.info("alert sender started");
while (Stopper.isRunning()) {
try {
List<Alert> alerts = alertDao.listPendingAlerts();
this.send(alerts);
ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L);
} catch (Exception e) {
logger.error("alert sender thread error", e);
}
}
}


public void send(List<Alert> alerts) {
for (Alert alert : alerts) {
//get alert group from alert
Expand All @@ -68,11 +90,11 @@ public void send(List<Alert> alerts) {
}
AlertData alertData = new AlertData();
alertData.setId(alert.getId())
.setContent(alert.getContent())
.setLog(alert.getLog())
.setTitle(alert.getTitle())
.setTitle(alert.getTitle())
.setWarnType(alert.getWarningType().getCode());
.setContent(alert.getContent())
.setLog(alert.getLog())
.setTitle(alert.getTitle())
.setTitle(alert.getTitle())
.setWarnType(alert.getWarningType().getCode());

int sendSuccessCount = 0;
for (AlertPluginInstance instance : alertInstanceList) {
Expand All @@ -93,23 +115,22 @@ public void send(List<Alert> alerts) {
}
alertDao.updateAlert(alertStatus, "", alert.getId());
}

}

/**
* sync send alert handler
*
* @param alertGroupId alertGroupId
* @param title title
* @param content content
* @param title title
* @param content content
* @return AlertSendResponseCommand
*/
public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content , int warnType) {
public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content, int warnType) {
List<AlertPluginInstance> alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId);
AlertData alertData = new AlertData();
alertData.setContent(content)
.setTitle(title)
.setWarnType(warnType);
.setTitle(title)
.setWarnType(warnType);

boolean sendResponseStatus = true;
List<AlertSendResponseResult> sendResponseResults = new ArrayList<>();
Expand All @@ -128,7 +149,7 @@ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, Stri
AlertResult alertResult = this.alertResultHandler(instance, alertData);
if (alertResult != null) {
AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult(
Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage());
sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus();
sendResponseResults.add(alertSendResponseResult);
}
Expand All @@ -140,7 +161,7 @@ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, Stri
/**
* alert result handler
*
* @param instance instance
* @param instance instance
* @param alertData alertData
* @return AlertResult
*/
Expand All @@ -159,7 +180,7 @@ private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData a
Map<String, String> paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams());
String instanceWarnType = WarningType.ALL.getDescp();

if(paramsMap != null){
if (paramsMap != null) {
instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp());
}

Expand Down
Expand Up @@ -17,73 +17,95 @@

package org.apache.dolphinscheduler.alert;

import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.dao.PluginDao;
import org.apache.dolphinscheduler.dao.entity.Alert;
import org.apache.dolphinscheduler.remote.NettyRemotingServer;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;

import java.io.Closeable;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.event.EventListener;

import javax.annotation.PreDestroy;
import java.io.Closeable;

@SpringBootApplication
@ComponentScan("org.apache.dolphinscheduler")
public class AlertServer implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(AlertServer.class);

private final PluginDao pluginDao;
private final AlertDao alertDao;
private final AlertPluginManager alertPluginManager;
private final AlertSender alertSender;
private final AlertSenderService alertSenderService;
private final AlertRequestProcessor alertRequestProcessor;
private final AlertConfig alertConfig;
private NettyRemotingServer nettyRemotingServer;

private NettyRemotingServer server;

@Autowired
private AlertConfig config;

public AlertServer(PluginDao pluginDao, AlertDao alertDao, AlertPluginManager alertPluginManager, AlertSender alertSender, AlertRequestProcessor alertRequestProcessor) {
public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) {
this.pluginDao = pluginDao;
this.alertDao = alertDao;
this.alertPluginManager = alertPluginManager;
this.alertSender = alertSender;
this.alertSenderService = alertSenderService;
this.alertRequestProcessor = alertRequestProcessor;
this.alertConfig = alertConfig;
}

/**
* alert server startup, not use web service
*
* @param args arguments
*/
public static void main(String[] args) {
SpringApplication.run(AlertServer.class, args);
Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER);
new SpringApplicationBuilder(AlertServer.class).web(WebApplicationType.NONE).run(args);
}

@EventListener
public void start(ApplicationReadyEvent readyEvent) {
logger.info("Starting Alert server");
public void run(ApplicationReadyEvent readyEvent) {
logger.info("alert server starting...");

checkTable();
startServer();

Executors.newScheduledThreadPool(1)
.scheduleAtFixedRate(new Sender(), 5, 5, TimeUnit.SECONDS);
alertSenderService.start();
}

@Override
@PreDestroy
public void close() {
server.close();
destroy("alert server destroy");
}

/**
* gracefully stop
*
* @param cause stop cause
*/
public void destroy(String cause) {

try {
// execute only once
if (Stopper.isStopped()) {
return;
}

logger.info("alert server is stopping ..., cause : {}", cause);

// set stop signal is true
Stopper.stop();

// thread sleep 3 seconds for thread quietly stop
ThreadUtils.sleep(3000L);

// close
this.nettyRemotingServer.close();

} catch (Exception e) {
logger.error("alert server stop exception ", e);
}
}

private void checkTable() {
Expand All @@ -95,26 +117,11 @@ private void checkTable() {

private void startServer() {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(config.getPort());
serverConfig.setListenPort(alertConfig.getPort());

server = new NettyRemotingServer(serverConfig);
server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
server.start();
nettyRemotingServer = new NettyRemotingServer(serverConfig);
nettyRemotingServer.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor);
nettyRemotingServer.start();
}

final class Sender implements Runnable {
@Override
public void run() {
if (!Stopper.isRunning()) {
return;
}

try {
final List<Alert> alerts = alertDao.listPendingAlerts();
alertSender.send(alerts);
} catch (Exception e) {
logger.error("Failed to send alert", e);
}
}
}
}