Skip to content

Commit

Permalink
[Connector API] Simplify updating draft filtering rules (#107364)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb committed Apr 12, 2024
1 parent 96b513a commit b85b9dc
Show file tree
Hide file tree
Showing 12 changed files with 503 additions and 445 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,26 +43,23 @@
*/
public class ConnectorFiltering implements Writeable, ToXContentObject {

private final FilteringRules active;
private final String domain;
private final FilteringRules draft;
private FilteringRules active;
private final String domain = "DEFAULT"; // Connectors always use DEFAULT domain, users should not modify it via API
private FilteringRules draft;

/**
* Constructs a new ConnectorFiltering instance.
*
* @param active The active filtering rules.
* @param domain The domain associated with the filtering.
* @param draft The draft filtering rules.
*/
public ConnectorFiltering(FilteringRules active, String domain, FilteringRules draft) {
public ConnectorFiltering(FilteringRules active, FilteringRules draft) {
this.active = active;
this.domain = domain;
this.draft = draft;
}

public ConnectorFiltering(StreamInput in) throws IOException {
this.active = new FilteringRules(in);
this.domain = in.readString();
this.draft = new FilteringRules(in);
}

Expand All @@ -78,22 +75,27 @@ public FilteringRules getDraft() {
return draft;
}

public ConnectorFiltering setActive(FilteringRules active) {
this.active = active;
return this;
}

public ConnectorFiltering setDraft(FilteringRules draft) {
this.draft = draft;
return this;
}

private static final ParseField ACTIVE_FIELD = new ParseField("active");
private static final ParseField DOMAIN_FIELD = new ParseField("domain");
private static final ParseField DRAFT_FIELD = new ParseField("draft");

private static final ConstructingObjectParser<ConnectorFiltering, Void> PARSER = new ConstructingObjectParser<>(
"connector_filtering",
true,
args -> new ConnectorFiltering.Builder().setActive((FilteringRules) args[0])
.setDomain((String) args[1])
.setDraft((FilteringRules) args[2])
.build()
args -> new ConnectorFiltering.Builder().setActive((FilteringRules) args[0]).setDraft((FilteringRules) args[1]).build()
);

static {
PARSER.declareObject(constructorArg(), (p, c) -> FilteringRules.fromXContent(p), ACTIVE_FIELD);
PARSER.declareString(constructorArg(), DOMAIN_FIELD);
PARSER.declareObject(constructorArg(), (p, c) -> FilteringRules.fromXContent(p), DRAFT_FIELD);
}

Expand All @@ -102,7 +104,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.startObject();
{
builder.field(ACTIVE_FIELD.getPreferredName(), active);
builder.field(DOMAIN_FIELD.getPreferredName(), domain);
builder.field("domain", domain); // We still want to write the DEFAULT domain to the index
builder.field(DRAFT_FIELD.getPreferredName(), draft);
}
builder.endObject();
Expand All @@ -124,7 +126,6 @@ public static ConnectorFiltering fromXContentBytes(BytesReference source, XConte
@Override
public void writeTo(StreamOutput out) throws IOException {
active.writeTo(out);
out.writeString(domain);
draft.writeTo(out);
}

Expand All @@ -141,32 +142,75 @@ public int hashCode() {
return Objects.hash(active, domain, draft);
}

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<List<ConnectorFiltering>, Void> CONNECTOR_FILTERING_PARSER =
new ConstructingObjectParser<>(
"connector_filtering_parser",
true,
args -> (List<ConnectorFiltering>) args[0]

);

static {
CONNECTOR_FILTERING_PARSER.declareObjectArray(
constructorArg(),
(p, c) -> ConnectorFiltering.fromXContent(p),
Connector.FILTERING_FIELD
);
}

/**
* Deserializes the {@link ConnectorFiltering} property from a {@link Connector} byte representation.
*
* @param source Byte representation of the {@link Connector}.
* @param xContentType {@link XContentType} of the content (e.g., JSON).
* @return List of {@link ConnectorFiltering} objects.
*/
public static List<ConnectorFiltering> fromXContentBytesConnectorFiltering(BytesReference source, XContentType xContentType) {
try (XContentParser parser = XContentHelper.createParser(XContentParserConfiguration.EMPTY, source, xContentType)) {
return CONNECTOR_FILTERING_PARSER.parse(parser, null);
} catch (IOException e) {
throw new ElasticsearchParseException("Failed to parse a connector filtering.", e);
}
}

public static class Builder {

private FilteringRules active;
private String domain;
private FilteringRules draft;

public Builder setActive(FilteringRules active) {
this.active = active;
return this;
}

public Builder setDomain(String domain) {
this.domain = domain;
return this;
}

public Builder setDraft(FilteringRules draft) {
this.draft = draft;
return this;
}

public ConnectorFiltering build() {
return new ConnectorFiltering(active, domain, draft);
return new ConnectorFiltering(active, draft);
}
}

public static boolean isDefaultRulePresentInFilteringRules(List<FilteringRule> rules) {
FilteringRule defaultRule = getDefaultFilteringRule(null);
return rules.stream().anyMatch(rule -> rule.equalsExceptForTimestampsAndOrder(defaultRule));
}

public static FilteringRule getDefaultFilteringRule(Instant timestamp) {
return new FilteringRule.Builder().setCreatedAt(timestamp)
.setField("_")
.setId("DEFAULT")
.setOrder(0)
.setPolicy(FilteringPolicy.INCLUDE)
.setRule(FilteringRuleCondition.REGEX)
.setUpdatedAt(timestamp)
.setValue(".*")
.build();
}

public static ConnectorFiltering getDefaultConnectorFilteringConfig() {

Instant currentTimestamp = Instant.now();
Expand All @@ -178,47 +222,22 @@ public static ConnectorFiltering getDefaultConnectorFilteringConfig() {
.setAdvancedSnippetValue(Collections.emptyMap())
.build()
)
.setRules(
List.of(
new FilteringRule.Builder().setCreatedAt(currentTimestamp)
.setField("_")
.setId("DEFAULT")
.setOrder(0)
.setPolicy(FilteringPolicy.INCLUDE)
.setRule(FilteringRuleCondition.REGEX)
.setUpdatedAt(currentTimestamp)
.setValue(".*")
.build()
)
)
.setRules(List.of(getDefaultFilteringRule(currentTimestamp)))
.setFilteringValidationInfo(
new FilteringValidationInfo.Builder().setValidationErrors(Collections.emptyList())
.setValidationState(FilteringValidationState.VALID)
.build()
)
.build()
)
.setDomain("DEFAULT")
.setDraft(
new FilteringRules.Builder().setAdvancedSnippet(
new FilteringAdvancedSnippet.Builder().setAdvancedSnippetCreatedAt(currentTimestamp)
.setAdvancedSnippetUpdatedAt(currentTimestamp)
.setAdvancedSnippetValue(Collections.emptyMap())
.build()
)
.setRules(
List.of(
new FilteringRule.Builder().setCreatedAt(currentTimestamp)
.setField("_")
.setId("DEFAULT")
.setOrder(0)
.setPolicy(FilteringPolicy.INCLUDE)
.setRule(FilteringRuleCondition.REGEX)
.setUpdatedAt(currentTimestamp)
.setValue(".*")
.build()
)
)
.setRules(List.of(getDefaultFilteringRule(currentTimestamp)))
.setFilteringValidationInfo(
new FilteringValidationInfo.Builder().setValidationErrors(Collections.emptyList())
.setValidationState(FilteringValidationState.VALID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.action.PostConnectorAction;
import org.elasticsearch.xpack.application.connector.action.PutConnectorAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorApiKeyIdAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorConfigurationAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorErrorAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorIndexNameAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSyncStatsAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorNameAction;
Expand All @@ -54,6 +54,10 @@
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorServiceTypeAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorStatusAction;
import org.elasticsearch.xpack.application.connector.filtering.FilteringAdvancedSnippet;
import org.elasticsearch.xpack.application.connector.filtering.FilteringRule;
import org.elasticsearch.xpack.application.connector.filtering.FilteringRules;
import org.elasticsearch.xpack.application.connector.filtering.FilteringValidationInfo;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJob;
import org.elasticsearch.xpack.application.connector.syncjob.ConnectorSyncJobIndexService;

Expand All @@ -70,6 +74,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.application.connector.ConnectorFiltering.fromXContentBytesConnectorFiltering;

/**
* A service that manages persistent {@link Connector} configurations.
Expand Down Expand Up @@ -555,19 +560,19 @@ public void updateConnectorNameOrDescription(UpdateConnectorNameAction.Request r
}

/**
* Updates the {@link ConnectorFiltering} property of a {@link Connector}.
* Sets the {@link ConnectorFiltering} property of a {@link Connector}.
*
* @param request Request for updating connector filtering property.
* @param listener Listener to respond to a successful response or an error.
* @param connectorId The ID of the {@link Connector} to update.
* @param filtering The list of {@link ConnectorFiltering} .
* @param listener Listener to respond to a successful response or an error.
*/
public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request request, ActionListener<UpdateResponse> listener) {
public void updateConnectorFiltering(String connectorId, List<ConnectorFiltering> filtering, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), request.getFiltering()))
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), filtering))
);
client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
Expand All @@ -581,6 +586,64 @@ public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request requ
}
}

/**
* Updates the draft filtering in a given {@link Connector}.
*
* @param connectorId The ID of the {@link Connector} to be updated.
* @param advancedSnippet An instance of {@link FilteringAdvancedSnippet}.
* @param rules A list of instances of {@link FilteringRule} to be applied.
* @param listener Listener to respond to a successful response or an error.
*/
public void updateConnectorFilteringDraft(
String connectorId,
FilteringAdvancedSnippet advancedSnippet,
List<FilteringRule> rules,
ActionListener<UpdateResponse> listener
) {
try {
getConnector(connectorId, listener.delegateFailure((l, connector) -> {
List<ConnectorFiltering> connectorFilteringList = fromXContentBytesConnectorFiltering(
connector.getSourceRef(),
XContentType.JSON
);
// Connectors represent their filtering configuration as a singleton list
ConnectorFiltering connectorFilteringSingleton = connectorFilteringList.get(0);

// If advanced snippet or rules are not defined, keep the current draft state
FilteringAdvancedSnippet newDraftAdvancedSnippet = advancedSnippet == null
? connectorFilteringSingleton.getDraft().getAdvancedSnippet()
: advancedSnippet;

List<FilteringRule> newDraftRules = rules == null ? connectorFilteringSingleton.getDraft().getRules() : rules;

ConnectorFiltering connectorFilteringWithUpdatedDraft = connectorFilteringSingleton.setDraft(
new FilteringRules.Builder().setRules(newDraftRules)
.setAdvancedSnippet(newDraftAdvancedSnippet)
.setFilteringValidationInfo(FilteringValidationInfo.getInitialDraftValidationInfo())
.build()
);

final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), List.of(connectorFilteringWithUpdatedDraft)))
);

client.update(updateRequest, new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (ll, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
ll.onFailure(new ResourceNotFoundException(connectorNotFoundErrorMsg(connectorId)));
return;
}
ll.onResponse(updateResponse);
}));
}));

} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Updates the lastSeen property of a {@link Connector}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.application.connector.ConnectorFiltering;
import org.elasticsearch.xpack.application.connector.ConnectorIndexService;
import org.elasticsearch.xpack.application.connector.filtering.FilteringAdvancedSnippet;
import org.elasticsearch.xpack.application.connector.filtering.FilteringRule;

import java.util.List;

public class TransportUpdateConnectorFilteringAction extends HandledTransportAction<
UpdateConnectorFilteringAction.Request,
Expand Down Expand Up @@ -47,6 +52,27 @@ protected void doExecute(
UpdateConnectorFilteringAction.Request request,
ActionListener<ConnectorUpdateActionResponse> listener
) {
connectorIndexService.updateConnectorFiltering(request, listener.map(r -> new ConnectorUpdateActionResponse(r.getResult())));
String connectorId = request.getConnectorId();
List<ConnectorFiltering> filtering = request.getFiltering();
FilteringAdvancedSnippet advancedSnippet = request.getAdvancedSnippet();
List<FilteringRule> rules = request.getRules();
// If [filtering] is not present in request body, it means that user's intention is to
// update draft's rules or advanced snippet
if (request.getFiltering() == null) {
connectorIndexService.updateConnectorFilteringDraft(
connectorId,
advancedSnippet,
rules,
listener.map(r -> new ConnectorUpdateActionResponse(r.getResult()))
);
}
// Otherwise override the whole filtering object (discouraged in docs)
else {
connectorIndexService.updateConnectorFiltering(
connectorId,
filtering,
listener.map(r -> new ConnectorUpdateActionResponse(r.getResult()))
);
}
}
}

0 comments on commit b85b9dc

Please sign in to comment.