diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java index f590d31aeed4..cc8476930430 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertPluginManager.java @@ -41,8 +41,6 @@ import java.util.ServiceLoader; import java.util.Set; -import javax.annotation.PostConstruct; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; @@ -55,23 +53,23 @@ public final class AlertPluginManager { private final PluginDao pluginDao; - private final Map channelKeyedById = new HashMap<>(); - - private final PluginParams warningTypeParams = getWarningTypeParams(); - public AlertPluginManager(PluginDao pluginDao) { this.pluginDao = pluginDao; } + private final Map channelKeyedById = new HashMap<>(); + + private final PluginParams warningTypeParams = getWarningTypeParams(); + public PluginParams getWarningTypeParams() { return - RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE) - .addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false)) - .addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false)) - .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false)) - .setValue(WarningType.ALL.getDescp()) - .addValidate(Validate.newBuilder().setRequired(true).build()) - .build(); + RadioParam.newBuilder(AlertConstants.NAME_WARNING_TYPE, AlertConstants.WARNING_TYPE) + .addParamsOptions(new ParamsOptions(WarningType.SUCCESS.getDescp(), WarningType.SUCCESS.getDescp(), false)) + .addParamsOptions(new ParamsOptions(WarningType.FAILURE.getDescp(), WarningType.FAILURE.getDescp(), false)) + .addParamsOptions(new ParamsOptions(WarningType.ALL.getDescp(), WarningType.ALL.getDescp(), false)) + .setValue(WarningType.ALL.getDescp()) + .addValidate(Validate.newBuilder().setRequired(true).build()) + .build(); } @EventListener diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java index c85292f72538..6bb4aa004993 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertRequestProcessor.java @@ -36,10 +36,10 @@ public final class AlertRequestProcessor implements NettyRequestProcessor { private static final Logger logger = LoggerFactory.getLogger(AlertRequestProcessor.class); - private final AlertSender alertSender; + private final AlertSenderService alertSenderService; - public AlertRequestProcessor(AlertSender alertSender) { - this.alertSender = alertSender; + public AlertRequestProcessor(AlertSenderService alertSenderService) { + this.alertSenderService = alertSenderService; } @Override @@ -51,7 +51,7 @@ public void process(Channel channel, Command command) { logger.info("Received command : {}", alertSendRequestCommand); - AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler( + AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler( alertSendRequestCommand.getGroupId(), alertSendRequestCommand.getTitle(), alertSendRequestCommand.getContent(), diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java similarity index 85% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java index 3ddc5c57953f..af4791353712 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSender.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertSenderService.java @@ -17,21 +17,26 @@ package org.apache.dolphinscheduler.alert; +import org.apache.commons.collections.CollectionUtils; import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertConstants; import org.apache.dolphinscheduler.alert.api.AlertData; import org.apache.dolphinscheduler.alert.api.AlertInfo; import org.apache.dolphinscheduler.alert.api.AlertResult; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.AlertStatus; import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.thread.Stopper; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.dao.entity.AlertPluginInstance; import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseResult; - -import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.HashSet; @@ -40,22 +45,39 @@ import java.util.Optional; import java.util.Set; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; - -@Component -public final class AlertSender { - private static final Logger logger = LoggerFactory.getLogger(AlertSender.class); +@Service +public final class AlertSenderService extends Thread { + private static final Logger logger = LoggerFactory.getLogger(AlertSenderService.class); private final AlertDao alertDao; private final AlertPluginManager alertPluginManager; - public AlertSender(AlertDao alertDao, AlertPluginManager alertPluginManager) { + public AlertSenderService(AlertDao alertDao, AlertPluginManager alertPluginManager) { this.alertDao = alertDao; this.alertPluginManager = alertPluginManager; } + @Override + public synchronized void start() { + super.setName("AlertSenderService"); + super.start(); + } + + @Override + public void run() { + logger.info("alert sender started"); + while (Stopper.isRunning()) { + try { + List alerts = alertDao.listPendingAlerts(); + this.send(alerts); + ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS * 5L); + } catch (Exception e) { + logger.error("alert sender thread error", e); + } + } + } + + public void send(List alerts) { for (Alert alert : alerts) { //get alert group from alert @@ -68,11 +90,11 @@ public void send(List alerts) { } AlertData alertData = new AlertData(); alertData.setId(alert.getId()) - .setContent(alert.getContent()) - .setLog(alert.getLog()) - .setTitle(alert.getTitle()) - .setTitle(alert.getTitle()) - .setWarnType(alert.getWarningType().getCode()); + .setContent(alert.getContent()) + .setLog(alert.getLog()) + .setTitle(alert.getTitle()) + .setTitle(alert.getTitle()) + .setWarnType(alert.getWarningType().getCode()); int sendSuccessCount = 0; for (AlertPluginInstance instance : alertInstanceList) { @@ -93,23 +115,22 @@ public void send(List alerts) { } alertDao.updateAlert(alertStatus, "", alert.getId()); } - } /** * sync send alert handler * * @param alertGroupId alertGroupId - * @param title title - * @param content content + * @param title title + * @param content content * @return AlertSendResponseCommand */ - public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content , int warnType) { + public AlertSendResponseCommand syncHandler(int alertGroupId, String title, String content, int warnType) { List alertInstanceList = alertDao.listInstanceByAlertGroupId(alertGroupId); AlertData alertData = new AlertData(); alertData.setContent(content) - .setTitle(title) - .setWarnType(warnType); + .setTitle(title) + .setWarnType(warnType); boolean sendResponseStatus = true; List sendResponseResults = new ArrayList<>(); @@ -128,7 +149,7 @@ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, Stri AlertResult alertResult = this.alertResultHandler(instance, alertData); if (alertResult != null) { AlertSendResponseResult alertSendResponseResult = new AlertSendResponseResult( - Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage()); + Boolean.parseBoolean(String.valueOf(alertResult.getStatus())), alertResult.getMessage()); sendResponseStatus = sendResponseStatus && alertSendResponseResult.getStatus(); sendResponseResults.add(alertSendResponseResult); } @@ -140,7 +161,7 @@ public AlertSendResponseCommand syncHandler(int alertGroupId, String title, Stri /** * alert result handler * - * @param instance instance + * @param instance instance * @param alertData alertData * @return AlertResult */ @@ -159,7 +180,7 @@ private AlertResult alertResultHandler(AlertPluginInstance instance, AlertData a Map paramsMap = JSONUtils.toMap(instance.getPluginInstanceParams()); String instanceWarnType = WarningType.ALL.getDescp(); - if(paramsMap != null){ + if (paramsMap != null) { instanceWarnType = paramsMap.getOrDefault(AlertConstants.NAME_WARNING_TYPE, WarningType.ALL.getDescp()); } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java index 6b18befe39e3..60bd09e6e2b8 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/main/java/org/apache/dolphinscheduler/alert/AlertServer.java @@ -17,73 +17,95 @@ package org.apache.dolphinscheduler.alert; +import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.thread.Stopper; -import org.apache.dolphinscheduler.dao.AlertDao; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.dao.PluginDao; -import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; - -import java.io.Closeable; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.annotation.PreDestroy; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; +import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.event.EventListener; +import javax.annotation.PreDestroy; +import java.io.Closeable; + @SpringBootApplication @ComponentScan("org.apache.dolphinscheduler") public class AlertServer implements Closeable { private static final Logger logger = LoggerFactory.getLogger(AlertServer.class); private final PluginDao pluginDao; - private final AlertDao alertDao; - private final AlertPluginManager alertPluginManager; - private final AlertSender alertSender; + private final AlertSenderService alertSenderService; private final AlertRequestProcessor alertRequestProcessor; + private final AlertConfig alertConfig; + private NettyRemotingServer nettyRemotingServer; - private NettyRemotingServer server; - - @Autowired - private AlertConfig config; - - public AlertServer(PluginDao pluginDao, AlertDao alertDao, AlertPluginManager alertPluginManager, AlertSender alertSender, AlertRequestProcessor alertRequestProcessor) { + public AlertServer(PluginDao pluginDao, AlertSenderService alertSenderService, AlertRequestProcessor alertRequestProcessor, AlertConfig alertConfig) { this.pluginDao = pluginDao; - this.alertDao = alertDao; - this.alertPluginManager = alertPluginManager; - this.alertSender = alertSender; + this.alertSenderService = alertSenderService; this.alertRequestProcessor = alertRequestProcessor; + this.alertConfig = alertConfig; } + /** + * alert server startup, not use web service + * + * @param args arguments + */ public static void main(String[] args) { - SpringApplication.run(AlertServer.class, args); + Thread.currentThread().setName(Constants.THREAD_NAME_ALERT_SERVER); + new SpringApplicationBuilder(AlertServer.class).web(WebApplicationType.NONE).run(args); } @EventListener - public void start(ApplicationReadyEvent readyEvent) { - logger.info("Starting Alert server"); + public void run(ApplicationReadyEvent readyEvent) { + logger.info("alert server starting..."); checkTable(); startServer(); - - Executors.newScheduledThreadPool(1) - .scheduleAtFixedRate(new Sender(), 5, 5, TimeUnit.SECONDS); + alertSenderService.start(); } @Override @PreDestroy public void close() { - server.close(); + destroy("alert server destroy"); + } + + /** + * gracefully stop + * + * @param cause stop cause + */ + public void destroy(String cause) { + + try { + // execute only once + if (Stopper.isStopped()) { + return; + } + + logger.info("alert server is stopping ..., cause : {}", cause); + + // set stop signal is true + Stopper.stop(); + + // thread sleep 3 seconds for thread quietly stop + ThreadUtils.sleep(3000L); + + // close + this.nettyRemotingServer.close(); + + } catch (Exception e) { + logger.error("alert server stop exception ", e); + } } private void checkTable() { @@ -95,26 +117,11 @@ private void checkTable() { private void startServer() { NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(config.getPort()); + serverConfig.setListenPort(alertConfig.getPort()); - server = new NettyRemotingServer(serverConfig); - server.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor); - server.start(); + nettyRemotingServer = new NettyRemotingServer(serverConfig); + nettyRemotingServer.registerProcessor(CommandType.ALERT_SEND_REQUEST, alertRequestProcessor); + nettyRemotingServer.start(); } - final class Sender implements Runnable { - @Override - public void run() { - if (!Stopper.isRunning()) { - return; - } - - try { - final List alerts = alertDao.listPendingAlerts(); - alertSender.send(alerts); - } catch (Exception e) { - logger.error("Failed to send alert", e); - } - } - } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java index 911b912d760c..c37cf6fe7aff 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/AlertServerTest.java @@ -18,7 +18,9 @@ package org.apache.dolphinscheduler.alert; import junit.framework.TestCase; +import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.PluginDao; +import org.apache.dolphinscheduler.dao.entity.Alert; import org.apache.dolphinscheduler.remote.NettyRemotingServer; import org.apache.dolphinscheduler.remote.config.NettyServerConfig; import org.junit.Assert; @@ -27,9 +29,13 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import org.mockito.junit.MockitoJUnitRunner; import org.powermock.reflect.Whitebox; +import java.util.ArrayList; +import java.util.List; + @RunWith(MockitoJUnitRunner.class) public class AlertServerTest extends TestCase { @@ -42,19 +48,26 @@ public class AlertServerTest extends TestCase { @Mock private AlertConfig alertConfig; + + @Mock + private AlertSenderService alertSenderService; @Test public void testStart() { + Mockito.when(pluginDao.checkPluginDefineTableExist()).thenReturn(true); Mockito.when(alertConfig.getPort()).thenReturn(50053); - - alertServer.start(null); + + Mockito.doNothing().when(alertSenderService).start(); + + alertServer.run(null); - NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "server"); + NettyRemotingServer nettyRemotingServer = Whitebox.getInternalState(alertServer, "nettyRemotingServer"); NettyServerConfig nettyServerConfig = Whitebox.getInternalState(nettyRemotingServer, "serverConfig"); Assert.assertEquals(50053, nettyServerConfig.getListenPort()); + } } diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java index 64e92a02add8..41277fad50bf 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/processor/AlertRequestProcessorTest.java @@ -20,30 +20,35 @@ import static org.mockito.Mockito.mock; import org.apache.dolphinscheduler.alert.AlertRequestProcessor; -import org.apache.dolphinscheduler.alert.AlertSender; +import org.apache.dolphinscheduler.alert.AlertSenderService; import org.apache.dolphinscheduler.common.enums.WarningType; -import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.alert.AlertSendRequestCommand; +import org.apache.dolphinscheduler.remote.command.alert.AlertSendResponseCommand; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import io.netty.channel.Channel; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; +@RunWith(MockitoJUnitRunner.class) public class AlertRequestProcessorTest { + @InjectMocks private AlertRequestProcessor alertRequestProcessor; - @Before - public void before() { - final AlertDao alertDao = mock(AlertDao.class); - alertRequestProcessor = new AlertRequestProcessor(new AlertSender(alertDao, null)); - } + @Mock + private AlertSenderService alertSenderService; + @Test public void testProcess() { + Mockito.when(alertSenderService.syncHandler(1, "title", "content", WarningType.FAILURE.getCode())).thenReturn(new AlertSendResponseCommand()); Channel channel = mock(Channel.class); AlertSendRequestCommand alertSendRequestCommand = new AlertSendRequestCommand(1, "title", "content", WarningType.FAILURE.getCode()); Command reqCommand = alertSendRequestCommand.convert2Command(); diff --git a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java similarity index 86% rename from dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java rename to dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java index 4060e4634500..7b7853daf5c2 100644 --- a/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderTest.java +++ b/dolphinscheduler-alert/dolphinscheduler-alert-server/src/test/java/org/apache/dolphinscheduler/alert/runner/AlertSenderServiceTest.java @@ -21,7 +21,7 @@ import static org.mockito.Mockito.when; import org.apache.dolphinscheduler.alert.AlertPluginManager; -import org.apache.dolphinscheduler.alert.AlertSender; +import org.apache.dolphinscheduler.alert.AlertSenderService; import org.apache.dolphinscheduler.alert.api.AlertChannel; import org.apache.dolphinscheduler.alert.api.AlertResult; import org.apache.dolphinscheduler.common.enums.WarningType; @@ -39,24 +39,29 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class AlertSenderTest { - private static final Logger logger = LoggerFactory.getLogger(AlertSenderTest.class); +public class AlertSenderServiceTest { + private static final Logger logger = LoggerFactory.getLogger(AlertSenderServiceTest.class); + @Mock private AlertDao alertDao; + @Mock private PluginDao pluginDao; + @Mock private AlertPluginManager alertPluginManager; - private AlertSender alertSender; + @InjectMocks + private AlertSenderService alertSenderService; @Before public void before() { - alertDao = mock(AlertDao.class); - pluginDao = mock(PluginDao.class); - alertPluginManager = mock(AlertPluginManager.class); + MockitoAnnotations.initMocks(this); } @Test @@ -65,12 +70,11 @@ public void testSyncHandler() { int alertGroupId = 1; String title = "alert mail test title"; String content = "alert mail test content"; - alertSender = new AlertSender(alertDao, alertPluginManager); //1.alert instance does not exist when(alertDao.listInstanceByAlertGroupId(alertGroupId)).thenReturn(null); - AlertSendResponseCommand alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + AlertSendResponseCommand alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertFalse(alertSendResponseCommand.getResStatus()); alertSendResponseCommand.getResResults().forEach(result -> logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); @@ -89,7 +93,7 @@ public void testSyncHandler() { PluginDefine pluginDefine = new PluginDefine(pluginName, "1", null); when(pluginDao.getPluginDefineById(pluginDefineId)).thenReturn(pluginDefine); - alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertFalse(alertSendResponseCommand.getResStatus()); alertSendResponseCommand.getResResults().forEach(result -> logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); @@ -99,7 +103,7 @@ public void testSyncHandler() { when(alertChannelMock.process(Mockito.any())).thenReturn(null); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); - alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertFalse(alertSendResponseCommand.getResStatus()); alertSendResponseCommand.getResResults().forEach(result -> logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); @@ -111,7 +115,7 @@ public void testSyncHandler() { when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); - alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertFalse(alertSendResponseCommand.getResStatus()); alertSendResponseCommand.getResResults().forEach(result -> logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); @@ -123,7 +127,7 @@ public void testSyncHandler() { when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); - alertSendResponseCommand = alertSender.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); + alertSendResponseCommand = alertSenderService.syncHandler(alertGroupId, title, content, WarningType.ALL.getCode()); Assert.assertTrue(alertSendResponseCommand.getResStatus()); alertSendResponseCommand.getResResults().forEach(result -> logger.info("alert send response result, status:{}, message:{}", result.getStatus(), result.getMessage())); @@ -143,7 +147,7 @@ public void testRun() { alert.setWarningType(WarningType.FAILURE); alertList.add(alert); - alertSender = new AlertSender(alertDao, alertPluginManager); +// alertSenderService = new AlertSenderService(); int pluginDefineId = 1; String pluginInstanceParams = "alert-instance-mail-params"; @@ -165,6 +169,7 @@ public void testRun() { when(alertChannelMock.process(Mockito.any())).thenReturn(alertResult); when(alertPluginManager.getAlertChannel(1)).thenReturn(Optional.of(alertChannelMock)); Assert.assertTrue(Boolean.parseBoolean(alertResult.getStatus())); - alertSender.send(alertList); + when(alertDao.listInstanceByAlertGroupId(1)).thenReturn(new ArrayList<>()); + alertSenderService.send(alertList); } } diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java index 0572e8f9ec73..4199627f7155 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java @@ -327,6 +327,7 @@ private Constants() { public static final String NULL = "NULL"; public static final String THREAD_NAME_MASTER_SERVER = "Master-Server"; public static final String THREAD_NAME_WORKER_SERVER = "Worker-Server"; + public static final String THREAD_NAME_ALERT_SERVER = "Alert-Server"; /** * command parameter keys diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java index ba37e22d2a3f..83c81c9fbe59 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/alert/AlertSendRequestCommand.java @@ -88,4 +88,14 @@ public Command convert2Command() { command.setBody(body); return command; } + + @Override + public String toString() { + return "AlertSendRequestCommand{" + + "groupId=" + groupId + + ", title='" + title + '\'' + + ", content='" + content + '\'' + + ", warnType=" + warnType + + '}'; + } }