Skip to content

Commit

Permalink
[Monitoring] Make Exporters Async (#35765)
Browse files Browse the repository at this point in the history
This changes the exporter code -- most notably the `http` exporter --
to use async operations throughout the resource management and bulk
initialization code (the bulk indexing of monitoring documents was
already async).

As part of this change, this does change one semi-core aspect of the
`HttpResource` class in that it will no longer block all concurrent calls
until the first call completes with
`HttpResource::checkAndPublishIfDirty`.
Now, any parallel attempts to check the resources will be skipped until
the first call completes (success or failure). While this is a technical
change, it has very little practical impact because the existing behavior
was either quick success (then every blocked request processed) or
each request timed out and failed anyway, thus being effectively
skipped (and a burden on the system).
  • Loading branch information
pickypg committed Nov 27, 2018
1 parent c2329cd commit 3337fa7
Show file tree
Hide file tree
Showing 31 changed files with 1,291 additions and 1,038 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static Map<String, Object> exportersUsage(Exporters exporters) {
return null;
}
Map<String, Object> usage = new HashMap<>();
for (Exporter exporter : exporters) {
for (Exporter exporter : exporters.getEnabledExporters()) {
if (exporter.config().enabled()) {
String type = exporter.config().type();
int count = (Integer) usage.getOrDefault(type, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
import org.elasticsearch.xpack.monitoring.collector.Collector;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.Exporters;

import java.io.Closeable;
Expand Down Expand Up @@ -174,14 +173,7 @@ protected void doStop() {
protected void doClose() {
logger.debug("monitoring service is closing");
monitor.close();

for (Exporter exporter : exporters) {
try {
exporter.close();
} catch (Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to close exporter [{}]", exporter.name()), e);
}
}
exporters.close();
logger.debug("monitoring service closed");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.monitoring.exporter;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -93,17 +94,18 @@ public boolean isSingleton() {
}

/**
* Opens up a new export bulk. May return {@code null} indicating this exporter is not ready
* yet to export the docs
* Opens up a new export bulk.
*
* @param listener Returns {@code null} to indicate that this exporter is not ready to export the docs.
*/
public abstract ExportBulk openBulk();
public abstract void openBulk(ActionListener<ExportBulk> listener);

protected final boolean isClosed() {
return closed.get();
}

@Override
public void close() throws Exception {
public void close() {
if (closed.compareAndSet(false, true)) {
doClose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.xpack.monitoring.exporter.http.HttpExporter;
import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc;
Expand All @@ -27,7 +30,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -36,7 +38,7 @@

import static java.util.Collections.emptyMap;

public class Exporters extends AbstractLifecycleComponent implements Iterable<Exporter> {
public class Exporters extends AbstractLifecycleComponent {
private static final Logger logger = LogManager.getLogger(Exporters.class);

private final Settings settings;
Expand Down Expand Up @@ -90,9 +92,13 @@ public Exporter getExporter(String name) {
return exporters.get().get(name);
}

@Override
public Iterator<Exporter> iterator() {
return exporters.get().values().iterator();
/**
* Get all enabled {@linkplain Exporter}s.
*
* @return Never {@code null}. Can be empty if none are enabled.
*/
public Collection<Exporter> getEnabledExporters() {
return exporters.get().values();
}

static void closeExporters(Logger logger, Map<String, Exporter> exporters) {
Expand All @@ -105,31 +111,6 @@ static void closeExporters(Logger logger, Map<String, Exporter> exporters) {
}
}

ExportBulk openBulk() {
final ClusterState state = clusterService.state();

if (ClusterState.UNKNOWN_UUID.equals(state.metaData().clusterUUID()) || state.version() == ClusterState.UNKNOWN_VERSION) {
logger.trace("skipping exporters because the cluster state is not loaded");
return null;
}

List<ExportBulk> bulks = new ArrayList<>();
for (Exporter exporter : this) {
try {
ExportBulk bulk = exporter.openBulk();
if (bulk == null) {
logger.debug("skipping exporter [{}] as it is not ready yet", exporter.name());
} else {
bulks.add(bulk);
}
} catch (Exception e) {
logger.error(
(Supplier<?>) () -> new ParameterizedMessage("exporter [{}] failed to open exporting bulk", exporter.name()), e);
}
}
return bulks.isEmpty() ? null : new ExportBulk.Compound(bulks, threadContext);
}

Map<String, Exporter> initExporters(Settings settings) {
Set<String> singletons = new HashSet<>();
Map<String, Exporter> exporters = new HashMap<>();
Expand Down Expand Up @@ -180,38 +161,81 @@ Map<String, Exporter> initExporters(Settings settings) {
return exporters;
}

/**
* Wrap every {@linkplain Exporter}'s {@linkplain ExportBulk} in a {@linkplain ExportBulk.Compound}.
*
* @param listener {@code null} if no exporters are ready or available.
*/
void wrapExportBulk(final ActionListener<ExportBulk> listener) {
final ClusterState state = clusterService.state();

// wait until we have a usable cluster state
if (state.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) ||
ClusterState.UNKNOWN_UUID.equals(state.metaData().clusterUUID()) ||
state.version() == ClusterState.UNKNOWN_VERSION) {
logger.trace("skipping exporters because the cluster state is not loaded");

listener.onResponse(null);
return;
}

final Map<String, Exporter> exporterMap = exporters.get();
final AtomicArray<ExportBulk> accumulatedBulks = new AtomicArray<>(exporterMap.size());
final CountDown countDown = new CountDown(exporterMap.size());

int i = 0;

// get every exporter's ExportBulk and, when they've all responded, respond with a wrapped version
for (final Exporter exporter : exporterMap.values()) {
exporter.openBulk(
new AccumulatingExportBulkActionListener(exporter.name(), i++, accumulatedBulks, countDown, threadContext, listener));
}
}

/**
* Exports a collection of monitoring documents using the configured exporters
*/
public void export(Collection<MonitoringDoc> docs, ActionListener<Void> listener) throws ExportException {
public void export(final Collection<MonitoringDoc> docs, final ActionListener<Void> listener) throws ExportException {
if (this.lifecycleState() != Lifecycle.State.STARTED) {
listener.onFailure(new ExportException("Export service is not started"));
} else if (docs != null && docs.size() > 0) {
final ExportBulk bulk = openBulk();

if (bulk != null) {
final AtomicReference<ExportException> exceptionRef = new AtomicReference<>();
try {
bulk.add(docs);
} catch (ExportException e) {
exceptionRef.set(e);
} finally {
bulk.close(lifecycleState() == Lifecycle.State.STARTED, ActionListener.wrap(r -> {
if (exceptionRef.get() == null) {
listener.onResponse(null);
} else {
listener.onFailure(exceptionRef.get());
}
}, listener::onFailure));
wrapExportBulk(ActionListener.wrap(bulk -> {
if (bulk != null) {
doExport(bulk, docs, listener);
} else {
listener.onResponse(null);
}
} else {
listener.onResponse(null);
}
}, listener::onFailure));
} else {
listener.onResponse(null);
}
}

/**
* Add {@code docs} and send the {@code bulk}, then respond to the {@code listener}.
*
* @param bulk The bulk object to send {@code docs} through.
* @param docs The monitoring documents to send.
* @param listener Returns {@code null} when complete, or failure where relevant.
*/
private void doExport(final ExportBulk bulk, final Collection<MonitoringDoc> docs, final ActionListener<Void> listener) {
final AtomicReference<ExportException> exceptionRef = new AtomicReference<>();

try {
bulk.add(docs);
} catch (ExportException e) {
exceptionRef.set(e);
} finally {
bulk.close(lifecycleState() == Lifecycle.State.STARTED, ActionListener.wrap(r -> {
if (exceptionRef.get() == null) {
listener.onResponse(null);
} else {
listener.onFailure(exceptionRef.get());
}
}, listener::onFailure));
}
}

/**
* Return all the settings of all the exporters, no matter if HTTP or Local
*/
Expand All @@ -221,4 +245,66 @@ public static List<Setting.AffixSetting<?>> getSettings() {
settings.addAll(HttpExporter.getSettings());
return settings;
}

/**
* {@code AccumulatingExportBulkActionListener} allows us to asynchronously gather all of the {@linkplain ExportBulk}s that are
* ready, as associated with the enabled {@linkplain Exporter}s.
*/
static class AccumulatingExportBulkActionListener implements ActionListener<ExportBulk> {

private final String name;
private final int indexPosition;
private final AtomicArray<ExportBulk> accumulatedBulks;
private final CountDown countDown;
private final ActionListener<ExportBulk> delegate;
private final ThreadContext threadContext;

AccumulatingExportBulkActionListener(final String name,
final int indexPosition, final AtomicArray<ExportBulk> accumulatedBulks,
final CountDown countDown,
final ThreadContext threadContext, final ActionListener<ExportBulk> delegate) {
this.name = name;
this.indexPosition = indexPosition;
this.accumulatedBulks = accumulatedBulks;
this.countDown = countDown;
this.threadContext = threadContext;
this.delegate = delegate;
}

@Override
public void onResponse(final ExportBulk exportBulk) {
if (exportBulk == null) {
logger.debug("skipping exporter [{}] as it is not ready yet", name);
} else {
accumulatedBulks.set(indexPosition, exportBulk);
}

delegateIfComplete();
}

@Override
public void onFailure(Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("exporter [{}] failed to open exporting bulk", name), e);

delegateIfComplete();
}

/**
* Once all {@linkplain Exporter}'s have responded, whether it was success or failure, then this responds with all successful
* {@linkplain ExportBulk}s wrapped using an {@linkplain ExportBulk.Compound} wrapper.
*/
void delegateIfComplete() {
if (countDown.countDown()) {
final List<ExportBulk> bulkList = accumulatedBulks.asList();

if (bulkList.isEmpty()) {
delegate.onResponse(null);
} else {
delegate.onResponse(new ExportBulk.Compound(bulkList, threadContext));
}
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.http.entity.StringEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.CheckedFunction;
Expand Down Expand Up @@ -79,34 +80,33 @@ public ClusterAlertHttpResource(final String resourceOwnerName,
* Determine if the current {@linkplain #watchId Watch} exists.
*/
@Override
protected CheckResponse doCheck(final RestClient client) {
protected void doCheck(final RestClient client, final ActionListener<Boolean> listener) {
// if we should be adding, then we need to check for existence
if (isWatchDefined() && licenseState.isMonitoringClusterAlertsAllowed()) {
final CheckedFunction<Response, Boolean, IOException> watchChecker =
(response) -> shouldReplaceClusterAlert(response, XContentType.JSON.xContent(), LAST_UPDATED_VERSION);

return versionCheckForResource(client, logger,
"/_xpack/watcher/watch", watchId.get(), "monitoring cluster alert",
resourceOwnerName, "monitoring cluster",
watchChecker);
checkForResource(client, listener, logger,
"/_xpack/watcher/watch", watchId.get(), "monitoring cluster alert",
resourceOwnerName, "monitoring cluster",
GET_EXISTS, GET_DOES_NOT_EXIST,
watchChecker, this::alwaysReplaceResource);
} else {
// if we should be deleting, then just try to delete it (same level of effort as checking)
deleteResource(client, listener, logger, "/_xpack/watcher/watch", watchId.get(),
"monitoring cluster alert",
resourceOwnerName, "monitoring cluster");
}

// if we should be deleting, then just try to delete it (same level of effort as checking)
final boolean deleted = deleteResource(client, logger, "/_xpack/watcher/watch", watchId.get(),
"monitoring cluster alert",
resourceOwnerName, "monitoring cluster");

return deleted ? CheckResponse.EXISTS : CheckResponse.ERROR;
}

/**
* Publish the missing {@linkplain #watchId Watch}.
*/
@Override
protected boolean doPublish(final RestClient client) {
return putResource(client, logger,
"/_xpack/watcher/watch", watchId.get(), this::watchToHttpEntity, "monitoring cluster alert",
resourceOwnerName, "monitoring cluster");
protected void doPublish(final RestClient client, final ActionListener<Boolean> listener) {
putResource(client, listener, logger,
"/_xpack/watcher/watch", watchId.get(), this::watchToHttpEntity, "monitoring cluster alert",
resourceOwnerName, "monitoring cluster");
}

/**
Expand Down
Loading

0 comments on commit 3337fa7

Please sign in to comment.