Skip to content

Commit

Permalink
[Connectors API] Handle initialising connector default values in inde…
Browse files Browse the repository at this point in the history
…x service (#103977)
  • Loading branch information
jedrazb committed Jan 10, 2024
1 parent 53db0b3 commit 8e0de77
Show file tree
Hide file tree
Showing 21 changed files with 333 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,17 @@ setup:
service_type: super-connector

- match: { result: 'updated' }

---
'Create Connector - Invalid Index Name':
- do:
catch: "bad_request"
connector.put:
connector_id: test-connector-recreating
body:
index_name: _this-is-invalid-index-name
name: my-connector
language: pl
is_native: false
service_type: super-connector

Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,15 @@ setup:
- match: { custom_scheduling: {} }
- match: { filtering.0.domain: DEFAULT }

---
'Create Connector - Invalid Index Name':
- do:
catch: "bad_request"
connector.post:
body:
index_name: _this-is-invalid-index-name
name: my-connector
language: pl
is_native: false
service_type: super-connector

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ setup:
- skip:
version: " - 8.11.99"
reason: Introduced in 8.12.0

features: is_after
- do:
connector.put:
connector_id: test-connector
Expand All @@ -25,6 +25,19 @@ setup:
connector_id: test-connector

- exists: last_seen
- set: { last_seen: last_seen_before_check_in }

- do:
connector.check_in:
connector_id: test-connector

- match: { result: updated }

- do:
connector.get:
connector_id: test-connector

- is_after: { last_seen: $last_seen_before_check_in }

---
"Connector Check-in Error - Connector doesn't exist":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,22 +147,22 @@ private Connector(
) {
this.connectorId = connectorId;
this.apiKeyId = apiKeyId;
this.configuration = Objects.requireNonNull(configuration, "[configuration] cannot be null");
this.customScheduling = Objects.requireNonNull(customScheduling, "[custom_scheduling] cannot be null");
this.configuration = configuration;
this.customScheduling = customScheduling;
this.description = description;
this.error = error;
this.features = features;
this.filtering = Objects.requireNonNull(filtering, "[filtering] cannot be null");
this.filtering = filtering;
this.indexName = indexName;
this.isNative = isNative;
this.language = language;
this.lastSeen = lastSeen;
this.syncInfo = syncInfo;
this.name = name;
this.pipeline = pipeline;
this.scheduling = Objects.requireNonNull(scheduling, "[scheduling] cannot be null");
this.scheduling = scheduling;
this.serviceType = serviceType;
this.status = Objects.requireNonNull(status, "[status] cannot be null");
this.status = status;
this.syncCursor = syncCursor;
this.syncNow = syncNow;
}
Expand Down Expand Up @@ -549,19 +549,19 @@ public static class Builder {
private String description;
private String error;
private ConnectorFeatures features;
private List<ConnectorFiltering> filtering = List.of(ConnectorFiltering.getDefaultConnectorFilteringConfig());
private List<ConnectorFiltering> filtering;
private String indexName;
private boolean isNative = false;
private boolean isNative;
private String language;
private Instant lastSeen;
private ConnectorSyncInfo syncInfo = new ConnectorSyncInfo.Builder().build();
private String name;
private ConnectorIngestPipeline pipeline;
private ConnectorScheduling scheduling = ConnectorScheduling.getDefaultConnectorScheduling();
private ConnectorScheduling scheduling;
private String serviceType;
private ConnectorStatus status = ConnectorStatus.CREATED;
private Object syncCursor;
private boolean syncNow = false;
private boolean syncNow;

public Builder setConnectorId(String connectorId) {
this.connectorId = connectorId;
Expand Down Expand Up @@ -610,9 +610,6 @@ public Builder setIndexName(String indexName) {

public Builder setIsNative(boolean isNative) {
this.isNative = isNative;
if (isNative) {
this.status = ConnectorStatus.NEEDS_CONFIGURATION;
}
return this;
}

Expand All @@ -632,7 +629,7 @@ public Builder setSyncInfo(ConnectorSyncInfo syncInfo) {
}

public Builder setName(String name) {
this.name = Objects.requireNonNullElse(name, "");
this.name = name;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.action.PostConnectorAction;
import org.elasticsearch.xpack.application.connector.action.PutConnectorAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorConfigurationAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorErrorAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSeenAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSyncStatsAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorNameAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;

import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -67,16 +69,25 @@ public ConnectorIndexService(Client client) {
}

/**
* Creates or updates the {@link Connector} in the underlying index.
* Creates or updates the {@link Connector} in the underlying index with a specific doc ID.
*
* @param docId The ID of the connector.
* @param connector The connector object.
* @param request Request for creating the connector.
* @param listener The action listener to invoke on response/failure.
*/
public void putConnector(String docId, Connector connector, ActionListener<DocWriteResponse> listener) {
public void createConnectorWithDocId(PutConnectorAction.Request request, ActionListener<DocWriteResponse> listener) {

Connector connector = createConnectorWithDefaultValues(
request.getDescription(),
request.getIndexName(),
request.getIsNative(),
request.getLanguage(),
request.getName(),
request.getServiceType()
);

try {
final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(docId)
.id(request.getConnectorId())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(connector.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS));
clientWithOrigin.index(indexRequest, listener);
Expand All @@ -86,13 +97,25 @@ public void putConnector(String docId, Connector connector, ActionListener<DocWr
}

/**
* Creates or updates the {@link Connector} in the underlying index.
* Assigns connector an auto-generated doc ID.
* Creates or updates the {@link Connector} in the underlying index with an auto-generated doc ID.
*
* @param connector The connector object.
* @param request Request for creating the connector.
* @param listener The action listener to invoke on response/failure.
*/
public void postConnector(Connector connector, ActionListener<PostConnectorAction.Response> listener) {
public void createConnectorWithAutoGeneratedId(
PostConnectorAction.Request request,
ActionListener<PostConnectorAction.Response> listener
) {

Connector connector = createConnectorWithDefaultValues(
request.getDescription(),
request.getIndexName(),
request.getIsNative(),
request.getLanguage(),
request.getName(),
request.getServiceType()
);

try {
final IndexRequest indexRequest = new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
Expand All @@ -107,6 +130,43 @@ public void postConnector(Connector connector, ActionListener<PostConnectorActio
}
}

/**
* Creates a Connector with default values and specified parameters.
*
* @param description The description of the connector.
* @param indexName The name of the index associated with the connector.
* @param isNative Flag indicating if the connector is native; defaults to false if null.
* @param language The language supported by the connector.
* @param name The name of the connector; defaults to an empty string if null.
* @param serviceType The type of service the connector integrates with.
* @return A new instance of Connector with the specified values and default settings.
*/
private Connector createConnectorWithDefaultValues(
String description,
String indexName,
Boolean isNative,
String language,
String name,
String serviceType
) {
boolean isNativeConnector = Objects.requireNonNullElse(isNative, false);
ConnectorStatus status = isNativeConnector ? ConnectorStatus.NEEDS_CONFIGURATION : ConnectorStatus.CREATED;

return new Connector.Builder().setConfiguration(Collections.emptyMap())
.setCustomScheduling(Collections.emptyMap())
.setDescription(description)
.setFiltering(List.of(ConnectorFiltering.getDefaultConnectorFilteringConfig()))
.setIndexName(indexName)
.setIsNative(isNativeConnector)
.setLanguage(language)
.setSyncInfo(new ConnectorSyncInfo.Builder().build())
.setName(Objects.requireNonNullElse(name, ""))
.setScheduling(ConnectorScheduling.getDefaultConnectorScheduling())
.setServiceType(serviceType)
.setStatus(status)
.build();
}

/**
* Gets the {@link Connector} from the underlying index.
*
Expand Down Expand Up @@ -255,7 +315,7 @@ public void updateConnectorError(UpdateConnectorErrorAction.Request request, Act
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
.source(Map.of(Connector.ERROR_FIELD.getPreferredName(), request.getError()))
);
clientWithOrigin.update(
updateRequest,
Expand All @@ -281,6 +341,7 @@ public void updateConnectorError(UpdateConnectorErrorAction.Request request, Act
public void updateConnectorNameOrDescription(UpdateConnectorNameAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();

final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
Expand Down Expand Up @@ -315,7 +376,7 @@ public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request requ
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
.source(Map.of(Connector.FILTERING_FIELD.getPreferredName(), request.getFiltering()))
);
clientWithOrigin.update(
updateRequest,
Expand All @@ -335,17 +396,16 @@ public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request requ
/**
* Updates the lastSeen property of a {@link Connector}.
*
* @param request The request for updating the connector's lastSeen status.
* @param listener The listener for handling responses, including successful updates or errors.
* @param connectorId The id of the connector object.
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorLastSeen(UpdateConnectorLastSeenAction.Request request, ActionListener<UpdateResponse> listener) {
public void checkInConnector(String connectorId, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
.source(Map.of(Connector.LAST_SEEN_FIELD.getPreferredName(), Instant.now()))
);
clientWithOrigin.update(
updateRequest,
Expand Down Expand Up @@ -405,6 +465,7 @@ public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request reques
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(Map.of(Connector.PIPELINE_FIELD.getPreferredName(), request.getPipeline()))
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
Expand Down Expand Up @@ -435,7 +496,7 @@ public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request re
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
.source(Map.of(Connector.SCHEDULING_FIELD.getPreferredName(), request.getScheduling()))
);
clientWithOrigin.update(
updateRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentObject;
Expand All @@ -29,6 +32,7 @@
import java.io.IOException;
import java.util.Objects;

import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;

Expand Down Expand Up @@ -135,7 +139,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

@Override
public ActionRequestValidationException validate() {
return null;
ActionRequestValidationException validationException = null;

if (Strings.isNullOrEmpty(getIndexName())) {
validationException = addValidationError("[index_name] cannot be [null] or [\"\"]", validationException);
}
try {
MetadataCreateIndexService.validateIndexOrAliasName(getIndexName(), InvalidIndexNameException::new);
} catch (InvalidIndexNameException e) {
validationException = addValidationError(e.toString(), validationException);
}

return validationException;
}

@Override
Expand Down

0 comments on commit 8e0de77

Please sign in to comment.