Skip to content

Commit

Permalink
Fix Watcher email actions not using WebhookService for HTTP requests (e…
Browse files Browse the repository at this point in the history
…lastic#95196)

This commit fixes a bug where the internal `WebhookService` was not being used for Watcher's `email` attachment `reporting` or `url` types. This meant that why the additional token configured as part of elastic#93426 was sent for `webhook` Watcher actions, it was not being sent in the HTTP request made when resolving the attachments of the `email` action.

To accomplish this, the parts of `WebhookService` that modify and make the request have been extracted, and the `WebhookService` is now used by the `ReportingAttachmentParser` and `HttpEmailAttachmentParser` to perform their HTTP requests.

Additionally, some debug logging has been added for when the token is sent (and when a `WebhookService` request is executed).
  • Loading branch information
dakrone committed Apr 14, 2023
1 parent 3d2d18f commit 3e55b5b
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 41 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/95196.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 95196
summary: Add logging for debug to Watcher webhook service
area: Watcher
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -352,11 +352,11 @@ public Collection<Object> createComponents(

TextTemplateEngine templateEngine = new TextTemplateEngine(scriptService);
Map<String, EmailAttachmentParser<?>> emailAttachmentParsers = new HashMap<>();
emailAttachmentParsers.put(HttpEmailAttachementParser.TYPE, new HttpEmailAttachementParser(httpClient, templateEngine));
emailAttachmentParsers.put(HttpEmailAttachementParser.TYPE, new HttpEmailAttachementParser(webhookService, templateEngine));
emailAttachmentParsers.put(DataAttachmentParser.TYPE, new DataAttachmentParser());
emailAttachmentParsers.put(
ReportingAttachmentParser.TYPE,
new ReportingAttachmentParser(settings, httpClient, templateEngine, clusterService.getClusterSettings())
new ReportingAttachmentParser(settings, webhookService, templateEngine, clusterService.getClusterSettings())
);
EmailAttachmentsParser emailAttachmentsParser = new EmailAttachmentsParser(emailAttachmentParsers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.util.LazyInitializable;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.core.watcher.actions.Action;
Expand Down Expand Up @@ -121,41 +122,80 @@ public Action.Result execute(
Map<String, Object> model = Variables.createCtxParamsMap(ctx, payload);

// Render the original request
HttpRequest request = action.getRequest().render(templateEngine, model);
HttpRequest originalRequest = action.getRequest().render(templateEngine, model);

// If applicable, add the extra token to the headers
boolean tokenAdded = false;
WebhookAccount account = getAccount(NAME);
if (this.additionalTokenEnabled && account.hostTokenMap.size() > 0) {
// Generate a string like example.com:9200 to match against the list of hosts where the
// additional token should be provided. The token will only be added to the headers if
// the request matches the list.
String reqHostAndPort = request.host() + ":" + request.port();
if (Strings.hasText(account.hostTokenMap.get(reqHostAndPort))) {
// Add the additional token
tokenAdded = true;
request = request.copy().setHeader(TOKEN_HEADER_NAME, account.hostTokenMap.get(reqHostAndPort)).build();
}
if (ctx.simulateAction(actionId)) {
HttpRequest request = maybeModifyHttpRequest(originalRequest);
// If the request was modified, then the request has had the token added
boolean tokenAdded = originalRequest != request;
// Skip execution, return only the simulated (and redacted if necessary) response
return new WebhookAction.Result.Simulated(
tokenAdded ? request.copy().setHeader(TOKEN_HEADER_NAME, WatcherXContentParser.REDACTED_PASSWORD).build() : request
);
}

final Tuple<HttpRequest, HttpResponse> respTup = modifyAndExecuteHttpRequest(originalRequest);
final HttpRequest request = respTup.v1();
final HttpResponse response = respTup.v2();
// If the request was modified, then the request has had the token added
final boolean tokenAdded = originalRequest != request;

final Function<HttpRequest, HttpRequest> redactToken = tokenAdded
? req -> req.copy().setHeader(TOKEN_HEADER_NAME, WatcherXContentParser.REDACTED_PASSWORD).build()
: Function.identity();

if (ctx.simulateAction(actionId)) {
// Skip execution, return only the simulated (and redacted if necessary) response
return new WebhookAction.Result.Simulated(redactToken.apply(request));
}

HttpResponse response = httpClient.execute(request);

if (response.status() >= 400) {
return new WebhookAction.Result.Failure(redactToken.apply(request), response);
} else {
return new WebhookAction.Result.Success(redactToken.apply(request), response);
}
}

/**
* Makes any additional modifications to the {@link HttpRequest} if necessary.
* If no modifications are made the same instance is returned, otherwise a new
* HttpRequest is returned.
*/
HttpRequest maybeModifyHttpRequest(HttpRequest request) {
WebhookAccount account = getAccount(NAME);
if (this.additionalTokenEnabled && account.hostTokenMap.size() > 0) {
// Generate a string like example.com:9200 to match against the list of hosts where the
// additional token should be provided. The token will only be added to the headers if
// the request matches the list.
String reqHostAndPort = request.host() + ":" + request.port();
if (Strings.hasText(account.hostTokenMap.get(reqHostAndPort))) {
// Add the additional token
logger.debug(
"additional [{}] header token added to watcher webhook request for {}://{}:{}",
TOKEN_HEADER_NAME,
request.scheme().scheme(),
request.host(),
request.port()
);
return request.copy().setHeader(TOKEN_HEADER_NAME, account.hostTokenMap.get(reqHostAndPort)).build();
}
}
return request;
}

/**
* Executes the given {@link HttpRequest} after any necessary modifications.
* A tuple of the modified (or unmodified) {@link HttpRequest} and
* {@link HttpResponse} is returned.
*/
public Tuple<HttpRequest, HttpResponse> modifyAndExecuteHttpRequest(HttpRequest request) throws IOException {
final HttpRequest modifiedRequest = maybeModifyHttpRequest(request);
final HttpResponse response = httpClient.execute(modifiedRequest);
logger.debug(
"executed watcher webhook request for {}://{}:{}, response code: {}",
modifiedRequest.scheme().scheme(),
modifiedRequest.host(),
modifiedRequest.port(),
response.status()
);
return Tuple.tuple(modifiedRequest, response);
}

public static final class WebhookAccount {
private final Map<String, String> hostTokenMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.core.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpRequest;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.HttpResponse;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.notification.WebhookService;
import org.elasticsearch.xpack.watcher.notification.email.Attachment;
import org.elasticsearch.xpack.watcher.support.Variables;

Expand All @@ -34,11 +34,11 @@ public interface Fields {
}

public static final String TYPE = "http";
private final HttpClient httpClient;
private final WebhookService webhookService;
private final TextTemplateEngine templateEngine;

public HttpEmailAttachementParser(HttpClient httpClient, TextTemplateEngine templateEngine) {
this.httpClient = httpClient;
public HttpEmailAttachementParser(WebhookService webhookService, TextTemplateEngine templateEngine) {
this.webhookService = webhookService;
this.templateEngine = templateEngine;
}

Expand Down Expand Up @@ -82,7 +82,7 @@ public Attachment toAttachment(WatchExecutionContext context, Payload payload, H
Map<String, Object> model = Variables.createCtxParamsMap(context, payload);
HttpRequest httpRequest = attachment.getRequestTemplate().render(templateEngine, model);

HttpResponse response = httpClient.execute(httpRequest);
HttpResponse response = webhookService.modifyAndExecuteHttpRequest(httpRequest).v2();
// check for status 200, only then append attachment
if (response.status() >= 200 && response.status() < 300) {
if (response.hasContent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@
import org.elasticsearch.xpack.core.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.core.watcher.watch.Payload;
import org.elasticsearch.xpack.watcher.common.http.BasicAuth;
import org.elasticsearch.xpack.watcher.common.http.HttpClient;
import org.elasticsearch.xpack.watcher.common.http.HttpMethod;
import org.elasticsearch.xpack.watcher.common.http.HttpProxy;
import org.elasticsearch.xpack.watcher.common.http.HttpRequest;
import org.elasticsearch.xpack.watcher.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.watcher.common.http.HttpResponse;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.notification.WebhookService;
import org.elasticsearch.xpack.watcher.notification.email.Attachment;
import org.elasticsearch.xpack.watcher.support.Variables;

Expand Down Expand Up @@ -120,20 +120,20 @@ public static List<Setting<?>> getSettings() {
private final Logger logger;
private final TimeValue interval;
private final int retries;
private HttpClient httpClient;
private final WebhookService webhookService;
private final TextTemplateEngine templateEngine;
private boolean warningEnabled = REPORT_WARNING_ENABLED_SETTING.getDefault(Settings.EMPTY);
private final Map<String, String> customWarnings = new ConcurrentHashMap<>(1);

public ReportingAttachmentParser(
Settings settings,
HttpClient httpClient,
WebhookService webhookService,
TextTemplateEngine templateEngine,
ClusterSettings clusterSettings
) {
this.interval = INTERVAL_SETTING.get(settings);
this.retries = RETRIES_SETTING.get(settings);
this.httpClient = httpClient;
this.webhookService = webhookService;
this.templateEngine = templateEngine;
this.logger = LogManager.getLogger(getClass());
clusterSettings.addSettingsUpdateConsumer(REPORT_WARNING_ENABLED_SETTING, this::setWarningEnabled);
Expand Down Expand Up @@ -210,7 +210,7 @@ public Attachment toAttachment(WatchExecutionContext context, Payload payload, R
// IMPORTANT NOTE: This is only a temporary solution until we made the execution of watcher more async
// This still blocks other executions on the thread and we have to get away from that
sleep(sleepMillis, context, attachment);
HttpResponse response = httpClient.execute(pollingRequest);
HttpResponse response = webhookService.modifyAndExecuteHttpRequest(pollingRequest).v2();

if (response.status() == 503) {
// requires us to interval another run, no action to take, except logging
Expand Down Expand Up @@ -322,7 +322,7 @@ private long getSleepMillis(WatchExecutionContext context, ReportingAttachment a
* Trigger the initial report generation and catch possible exceptions
*/
private HttpResponse requestReportGeneration(String watchId, String attachmentId, HttpRequest request) throws IOException {
HttpResponse response = httpClient.execute(request);
HttpResponse response = webhookService.modifyAndExecuteHttpRequest(request).v2();
if (response.status() != 200) {
throw new ElasticsearchException(
"Watch[{}] reporting[{}] Error response when trying to trigger reporting generation "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.netty.handler.codec.http.HttpHeaders;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
Expand All @@ -35,6 +36,7 @@
import org.elasticsearch.xpack.watcher.common.http.HttpResponse;
import org.elasticsearch.xpack.watcher.common.text.TextTemplate;
import org.elasticsearch.xpack.watcher.common.text.TextTemplateEngine;
import org.elasticsearch.xpack.watcher.notification.WebhookService;
import org.elasticsearch.xpack.watcher.notification.email.Attachment;
import org.elasticsearch.xpack.watcher.notification.email.Authentication;
import org.elasticsearch.xpack.watcher.notification.email.Email;
Expand Down Expand Up @@ -62,6 +64,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -92,7 +95,10 @@ public void addEmailAttachmentParsers() {
Map<String, EmailAttachmentParser<? extends EmailAttachmentParser.EmailAttachment>> emailAttachmentParsers = new HashMap<>();
emailAttachmentParsers.put(
HttpEmailAttachementParser.TYPE,
new HttpEmailAttachementParser(httpClient, new MockTextTemplateEngine())
new HttpEmailAttachementParser(
new WebhookService(Settings.EMPTY, httpClient, mockClusterService().getClusterSettings()),
new MockTextTemplateEngine()
)
);
emailAttachmentParsers.put(DataAttachmentParser.TYPE, new DataAttachmentParser());
emailAttachmentParser = new EmailAttachmentsParser(emailAttachmentParsers);
Expand Down Expand Up @@ -547,7 +553,13 @@ public void testThatOneFailedEmailAttachmentResultsInActionFailure() throws Exce

// setup email attachment parsers
Map<String, EmailAttachmentParser<? extends EmailAttachmentParser.EmailAttachment>> attachmentParsers = new HashMap<>();
attachmentParsers.put(HttpEmailAttachementParser.TYPE, new HttpEmailAttachementParser(httpClient, engine));
attachmentParsers.put(
HttpEmailAttachementParser.TYPE,
new HttpEmailAttachementParser(
new WebhookService(Settings.EMPTY, httpClient, mockClusterService().getClusterSettings()),
engine
)
);
EmailAttachmentsParser emailAttachmentsParser = new EmailAttachmentsParser(attachmentParsers);

XContentBuilder builder = jsonBuilder().startObject()
Expand Down Expand Up @@ -666,4 +678,10 @@ public EmailSent send(Email email, Authentication auth, Profile profile, String
}
}

private ClusterService mockClusterService() {
ClusterService clusterService = mock(ClusterService.class);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, Set.of());
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
return clusterService;
}
}
Loading

0 comments on commit 3e55b5b

Please sign in to comment.