Skip to content

Commit

Permalink
[ML] Auditor ensures template is installed before writes (#63286)
Browse files Browse the repository at this point in the history
The ML auditors should not write if the latest template is not present. 
Instead a PUT template request is made and the writes queued up
  • Loading branch information
davidkyle committed Oct 6, 2020
1 parent 7405af8 commit 8f4ef40
Show file tree
Hide file tree
Showing 27 changed files with 460 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,77 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.ml.utils.MlIndexAndAlias;
import org.elasticsearch.xpack.core.template.IndexTemplateConfig;

import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public abstract class AbstractAuditor<T extends AbstractAuditMessage> {

private static final Logger logger = LogManager.getLogger(AbstractAuditor.class);
private final Client client;
static final int MAX_BUFFER_SIZE = 1000;

private final OriginSettingClient client;
private final String nodeName;
private final String auditIndex;
private final String executionOrigin;
private final String templateName;
private final Supplier<PutIndexTemplateRequest> templateSupplier;
private final AbstractAuditMessageFactory<T> messageFactory;
private final AtomicBoolean hasLatestTemplate;

private Queue<ToXContent> backlog;
private final ClusterService clusterService;
private final AtomicBoolean putTemplateInProgress;

protected AbstractAuditor(Client client,

protected AbstractAuditor(OriginSettingClient client,
String auditIndex,
IndexTemplateConfig templateConfig,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService) {

this(client, auditIndex, templateConfig.getTemplateName(),
() -> new PutIndexTemplateRequest(templateConfig.getTemplateName()).source(templateConfig.loadBytes(), XContentType.JSON),
nodeName, messageFactory, clusterService);
}


protected AbstractAuditor(OriginSettingClient client,
String auditIndex,
String executionOrigin,
AbstractAuditMessageFactory<T> messageFactory) {
String templateName,
Supplier<PutIndexTemplateRequest> templateSupplier,
String nodeName,
AbstractAuditMessageFactory<T> messageFactory,
ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.nodeName = Objects.requireNonNull(nodeName);
this.auditIndex = auditIndex;
this.executionOrigin = executionOrigin;
this.auditIndex = Objects.requireNonNull(auditIndex);
this.templateName = Objects.requireNonNull(templateName);
this.templateSupplier = Objects.requireNonNull(templateSupplier);
this.messageFactory = Objects.requireNonNull(messageFactory);
this.clusterService = Objects.requireNonNull(clusterService);
this.nodeName = Objects.requireNonNull(nodeName);
this.backlog = new ConcurrentLinkedQueue<>();
this.hasLatestTemplate = new AtomicBoolean();
this.putTemplateInProgress = new AtomicBoolean();
}

public void info(String resourceId, String message) {
Expand All @@ -64,16 +102,74 @@ private void onIndexFailure(Exception exception) {
}

private void indexDoc(ToXContent toXContent) {
if (hasLatestTemplate.get()) {
writeDoc(toXContent);
return;
}

if (MlIndexAndAlias.hasIndexTemplate(clusterService.state(), templateName)) {
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
}
writeDoc(toXContent);
return;
}

ActionListener<Boolean> putTemplateListener = ActionListener.wrap(
r -> {
synchronized (this) {
// synchronized so nothing can be added to backlog while this value changes
hasLatestTemplate.set(true);
}
logger.info("Auditor template [{}] successfully installed", templateName);
writeBacklog();
putTemplateInProgress.set(false);
},
e -> {
logger.warn("Error putting latest template [{}]", templateName);
putTemplateInProgress.set(false);
}
);

synchronized (this) {
if (hasLatestTemplate.get() == false) {
// synchronized so that hasLatestTemplate does not change value
// between the read and adding to the backlog
assert backlog != null;
if (backlog != null) {
if (backlog.size() >= MAX_BUFFER_SIZE) {
backlog.remove();
}
backlog.add(toXContent);
} else {
logger.error("Latest audit template missing but the back log has been written");
}

// stop multiple invocations
if (putTemplateInProgress.compareAndSet(false, true)) {
MlIndexAndAlias.installIndexTemplateIfRequired(clusterService.state(), client, templateSupplier.get(),
putTemplateListener);
}
return;
}
}

indexDoc(toXContent);
}

private void writeDoc(ToXContent toXContent) {
client.index(indexRequest(toXContent), ActionListener.wrap(
this::onIndexResponse,
this::onIndexFailure
));
}

private IndexRequest indexRequest(ToXContent toXContent) {
IndexRequest indexRequest = new IndexRequest(auditIndex);
indexRequest.source(toXContentBuilder(toXContent));
indexRequest.timeout(TimeValue.timeValueSeconds(5));
executeAsyncWithOrigin(client.threadPool().getThreadContext(),
executionOrigin,
indexRequest,
ActionListener.wrap(
this::onIndexResponse,
this::onIndexFailure
), client::index);
return indexRequest;
}

private XContentBuilder toXContentBuilder(ToXContent toXContent) {
Expand All @@ -83,4 +179,36 @@ private XContentBuilder toXContentBuilder(ToXContent toXContent) {
throw new RuntimeException(e);
}
}

private void writeBacklog() {
assert backlog != null;
if (backlog == null) {
logger.error("Message back log has already been written");
return;
}

BulkRequest bulkRequest = new BulkRequest();
ToXContent doc = backlog.poll();
while (doc != null) {
bulkRequest.add(indexRequest(doc));
doc = backlog.poll();
}

client.bulk(bulkRequest, ActionListener.wrap(
bulkItemResponses -> {
if (bulkItemResponses.hasFailures()) {
logger.warn("Failures bulk indexing the message back log: {}", bulkItemResponses.buildFailureMessage());
} else {
logger.trace("Successfully wrote audit message backlog after upgrading template");
}
backlog = null;
},
this::onIndexFailure
));
}

// for testing
int backLogSize() {
return backlog.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,35 @@ public static void installIndexTemplateIfRequired(

PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName)
.source(templateConfig.loadBytes(), XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1));

installIndexTemplateIfRequired(clusterState, client, request, listener);
}

/**
* See {@link #installIndexTemplateIfRequired(ClusterState, Client, IndexTemplateConfig, ActionListener)}.
*
* Overload takes a {@code PutIndexTemplateRequest} instead of {@code IndexTemplateConfig}
*
* @param clusterState The cluster state
* @param client For putting the template
* @param templateRequest The Put template request
* @param listener Async listener
*/
public static void installIndexTemplateIfRequired(
ClusterState clusterState,
Client client,
PutIndexTemplateRequest templateRequest,
ActionListener<Boolean> listener
) {
String templateName = templateRequest.name();

// The check for existence of the template is against the cluster state, so very cheap
if (hasIndexTemplate(clusterState, templateRequest.name())) {
listener.onResponse(true);
return;
}

templateRequest.masterNodeTimeout(TimeValue.timeValueMinutes(1));

ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(
response -> {
Expand All @@ -286,11 +314,11 @@ public static void installIndexTemplateIfRequired(
},
listener::onFailure);

executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, request, innerListener,
executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, templateRequest, innerListener,
client.admin().indices()::putTemplate);
}

private static boolean hasIndexTemplate(ClusterState state, String templateName) {
public static boolean hasIndexTemplate(ClusterState state, String templateName) {
return state.getMetadata().getTemplates().containsKey(templateName);
}
}

0 comments on commit 8f4ef40

Please sign in to comment.