Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamically configure alarm settings #3557

Merged
merged 13 commits into from Oct 5, 2019
Expand Up @@ -18,11 +18,17 @@

package org.apache.skywalking.oap.server.core.alarm.provider;

import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.joda.time.*;
import org.slf4j.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.skywalking.oap.server.core.alarm.AlarmCallback;
import org.apache.skywalking.oap.server.core.alarm.AlarmMessage;
import org.joda.time.LocalDateTime;
import org.joda.time.Minutes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Alarm core includes metrics values in certain time windows based on alarm settings. By using its internal timer
Expand All @@ -33,24 +39,15 @@
public class AlarmCore {
private static final Logger logger = LoggerFactory.getLogger(AlarmCore.class);

private Map<String, List<RunningRule>> runningContext;
private LocalDateTime lastExecuteTime;
private AlarmRulesWatcher alarmRulesWatcher;

AlarmCore(Rules rules) {
runningContext = new HashMap<>();
rules.getRules().forEach(rule -> {
RunningRule runningRule = new RunningRule(rule);

String metricsName = rule.getMetricsName();

List<RunningRule> runningRules = runningContext.computeIfAbsent(metricsName, key -> new ArrayList<>());

runningRules.add(runningRule);
});
AlarmCore(AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
}

public List<RunningRule> findRunningRule(String metricsName) {
return runningContext.get(metricsName);
return alarmRulesWatcher.getRunningContext().get(metricsName);
}

public void start(List<AlarmCallback> allCallbacks) {
Expand All @@ -62,10 +59,10 @@ public void start(List<AlarmCallback> allCallbacks) {
LocalDateTime checkTime = LocalDateTime.now();
int minutes = Minutes.minutesBetween(lastExecuteTime, checkTime).getMinutes();
boolean[] hasExecute = new boolean[] {false};
runningContext.values().forEach(ruleList -> ruleList.forEach(runningRule -> {
alarmRulesWatcher.getRunningContext().values().forEach(ruleList -> ruleList.forEach(runningRule -> {
if (minutes > 0) {
runningRule.moveTo(checkTime);
/**
/*
* Don't run in the first quarter per min, avoid to trigger false alarm.
*/
if (checkTime.getSecondOfMinute() > 15) {
Expand Down
Expand Up @@ -19,6 +19,9 @@
package org.apache.skywalking.oap.server.core.alarm.provider;

import java.io.*;

import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.apache.skywalking.oap.server.library.module.*;
Expand All @@ -27,6 +30,7 @@
public class AlarmModuleProvider extends ModuleProvider {

private NotifyHandler notifyHandler;
private AlarmRulesWatcher alarmRulesWatcher;

@Override public String name() {
return "default";
Expand All @@ -49,19 +53,24 @@ public class AlarmModuleProvider extends ModuleProvider {
}
RulesReader reader = new RulesReader(applicationReader);
Rules rules = reader.readRules();
notifyHandler = new NotifyHandler(rules);

alarmRulesWatcher = new AlarmRulesWatcher(rules, this);

notifyHandler = new NotifyHandler(alarmRulesWatcher);
notifyHandler.init(new AlarmStandardPersistence());
this.registerServiceImplementation(MetricsNotify.class, notifyHandler);
}

@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME).provider().getService(DynamicConfigurationService.class);
dynamicConfigurationService.registerConfigChangeWatcher(alarmRulesWatcher);
}

@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
notifyHandler.initCache(getManager());
}

@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
return new String[] {CoreModule.NAME, ConfigurationModule.NAME};
}
}
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.alarm.provider;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.Getter;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.AlarmModule;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;

/**
* @author kezhenxu94
*/
public class AlarmRulesWatcher extends ConfigChangeWatcher {
kezhenxu94 marked this conversation as resolved.
Show resolved Hide resolved
@Getter
private volatile Map<String, List<RunningRule>> runningContext;
private volatile Rules rules;
private volatile String settingsString;

public AlarmRulesWatcher(Rules defaultRules, ModuleProvider provider) {
super(AlarmModule.NAME, provider, "alarm-settings");
this.runningContext = new HashMap<>();
this.settingsString = Const.EMPTY_STRING;

notify(defaultRules);
}

@Override
public void notify(ConfigChangeEvent value) {
if (value.getEventType() == EventType.DELETE) {
settingsString = Const.EMPTY_STRING;
notify(new Rules());
} else {
settingsString = value.getNewValue();
RulesReader rulesReader = new RulesReader(new StringReader(value.getNewValue()));
Rules rules = rulesReader.readRules();
notify(rules);
}
}

private void notify(Rules newRules) {
Map<String, List<RunningRule>> newRunningContext = new HashMap<>();

newRules.getRules().forEach(rule -> {
RunningRule runningRule = new RunningRule(rule);

String metricsName = rule.getMetricsName();

List<RunningRule> runningRules = newRunningContext.computeIfAbsent(metricsName, key -> new ArrayList<>());

runningRules.add(runningRule);
});

this.rules = newRules;
this.runningContext = newRunningContext;
}

@Override
public String value() {
return settingsString;
}

public List<AlarmRule> getRules() {
return this.rules.getRules();
}

public List<String> getWebHooks() {
return this.rules.getWebhooks();
}
}
Expand Up @@ -33,11 +33,11 @@ public class NotifyHandler implements MetricsNotify {
private EndpointInventoryCache endpointInventoryCache;

private final AlarmCore core;
private final Rules rules;
private final AlarmRulesWatcher alarmRulesWatcher;

public NotifyHandler(Rules rules) {
this.rules = rules;
core = new AlarmCore(rules);
public NotifyHandler(AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
core = new AlarmCore(alarmRulesWatcher);
}

@Override public void notify(Metrics metrics) {
Expand Down Expand Up @@ -95,11 +95,8 @@ public NotifyHandler(Rules rules) {
}

public void init(AlarmCallback... callbacks) {
List<AlarmCallback> allCallbacks = new ArrayList<>();
for (AlarmCallback callback : callbacks) {
allCallbacks.add(callback);
}
allCallbacks.add(new WebhookCallback(rules.getWebhooks()));
List<AlarmCallback> allCallbacks = new ArrayList<>(Arrays.asList(callbacks));
allCallbacks.add(new WebhookCallback(alarmRulesWatcher));
core.start(allCallbacks);
}

Expand Down
Expand Up @@ -18,17 +18,16 @@

package org.apache.skywalking.oap.server.core.alarm.provider;

import com.google.gson.Gson;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.List;

import com.google.gson.Gson;
import io.netty.handler.codec.http.HttpHeaderValues;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
Expand All @@ -51,12 +50,12 @@ public class WebhookCallback implements AlarmCallback {
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 10000;

private List<String> remoteEndpoints;
private AlarmRulesWatcher alarmRulesWatcher;
private RequestConfig requestConfig;
private Gson gson = new Gson();

public WebhookCallback(List<String> remoteEndpoints) {
this.remoteEndpoints = remoteEndpoints;
public WebhookCallback(AlarmRulesWatcher alarmRulesWatcher) {
this.alarmRulesWatcher = alarmRulesWatcher;
requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
Expand All @@ -65,13 +64,13 @@ public WebhookCallback(List<String> remoteEndpoints) {

@Override
public void doAlarm(List<AlarmMessage> alarmMessage) {
if (remoteEndpoints.size() == 0) {
if (alarmRulesWatcher.getRules().size() == 0) {
return;
}

CloseableHttpClient httpClient = HttpClients.custom().build();
try {
remoteEndpoints.forEach(url -> {
alarmRulesWatcher.getWebHooks().forEach(url -> {
HttpPost post = new HttpPost(url);
post.setConfig(requestConfig);
post.setHeader(HttpHeaders.ACCEPT, HttpHeaderValues.APPLICATION_JSON.toString());
Expand All @@ -88,8 +87,6 @@ public void doAlarm(List<AlarmMessage> alarmMessage) {
}
} catch (UnsupportedEncodingException e) {
logger.error("Alarm to JSON error, " + e.getMessage(), e);
} catch (ClientProtocolException e) {
logger.error("send alarm to " + url + " failure.", e);
} catch (IOException e) {
logger.error("send alarm to " + url + " failure.", e);
}
Expand Down
Expand Up @@ -50,7 +50,7 @@ public void testTriggerTimePoint() throws InterruptedException {
Rules emptyRules = new Rules();
emptyRules.setRules(new ArrayList<>(0));
emptyRules.setWebhooks(new ArrayList<>(0));
AlarmCore core = new AlarmCore(emptyRules);
AlarmCore core = new AlarmCore(new AlarmRulesWatcher(emptyRules, null));

Map<String, List<RunningRule>> runningContext = Whitebox.getInternalState(core, "runningContext");

Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.oap.server.core.alarm.provider;

import java.io.IOException;
import java.io.Reader;

import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.spy;

/**
* @author kezhenxu94
*/
public class AlarmRulesWatcherTest {
@Spy
private AlarmRulesWatcher alarmRulesWatcher = new AlarmRulesWatcher(new Rules(), null);

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void shouldSetAlarmRulesOnEventChanged() throws IOException {
assertTrue(alarmRulesWatcher.getRules().isEmpty());

Reader reader = ResourceUtils.read("alarm-settings.yml");
char[] chars = new char[1024 * 1024];
int length = reader.read(chars);

alarmRulesWatcher.notify(new ConfigChangeWatcher.ConfigChangeEvent(new String(chars, 0, length), ConfigChangeWatcher.EventType.MODIFY));

assertEquals(2, alarmRulesWatcher.getRules().size());
assertEquals(2, alarmRulesWatcher.getWebHooks().size());
assertEquals(2, alarmRulesWatcher.getRunningContext().size());
}

@Test
public void shouldClearAlarmRulesOnEventDeleted() throws IOException {
Reader reader = ResourceUtils.read("alarm-settings.yml");
Rules defaultRules = new RulesReader(reader).readRules();

alarmRulesWatcher = spy(new AlarmRulesWatcher(defaultRules, null));

alarmRulesWatcher.notify(new ConfigChangeWatcher.ConfigChangeEvent("whatever", ConfigChangeWatcher.EventType.DELETE));

assertEquals(0, alarmRulesWatcher.getRules().size());
assertEquals(0, alarmRulesWatcher.getWebHooks().size());
assertEquals(0, alarmRulesWatcher.getRunningContext().size());
}
}
Expand Up @@ -197,7 +197,7 @@ public void setUp() throws Exception {

Rules rules = new Rules();

notifyHandler = new NotifyHandler(rules);
notifyHandler = new NotifyHandler(new AlarmRulesWatcher(rules, null));

notifyHandler.init(alarmMessageList -> {
for (AlarmMessage message : alarmMessageList) {
Expand Down
Expand Up @@ -58,7 +58,10 @@ public void stop() throws Exception {
public void testWebhook() {
List<String> remoteEndpoints = new ArrayList<>();
remoteEndpoints.add("http://127.0.0.1:8778/webhook/receiveAlarm");
kezhenxu94 marked this conversation as resolved.
Show resolved Hide resolved
WebhookCallback webhookCallback = new WebhookCallback(remoteEndpoints);
Rules rules = new Rules();
rules.setWebhooks(remoteEndpoints);
AlarmRulesWatcher alarmRulesWatcher = new AlarmRulesWatcher(rules, null);
WebhookCallback webhookCallback = new WebhookCallback(alarmRulesWatcher);
List<AlarmMessage> alarmMessages = new ArrayList<>(2);
AlarmMessage alarmMessage = new AlarmMessage();
alarmMessage.setScopeId(DefaultScopeDefine.ALL);
Expand Down