Skip to content

Commit

Permalink
[Connector API] Implement update last_sync endpoint (#102858)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb committed Dec 4, 2023
1 parent 9018f58 commit 759280c
Show file tree
Hide file tree
Showing 14 changed files with 669 additions and 62 deletions.
@@ -0,0 +1,39 @@
{
"connector.last_sync": {
"documentation": {
"url": "https://www.elastic.co/guide/en/enterprise-search/current/connectors.html",
"description": "Updates the stats of last sync in the connector document."
},
"stability": "experimental",
"visibility": "feature_flag",
"feature_flag": "es.connector_api_feature_flag_enabled",
"headers": {
"accept": [
"application/json"
],
"content_type": [
"application/json"
]
},
"url": {
"paths": [
{
"path": "/_connector/{connector_id}/_last_sync",
"methods": [
"PUT"
],
"parts": {
"connector_id": {
"type": "string",
"description": "The unique identifier of the connector to be updated."
}
}
}
]
},
"body": {
"description": "Object with stats related to the last connector sync run.",
"required": true
}
}
}
@@ -0,0 +1,62 @@
setup:
- skip:
version: " - 8.11.99"
reason: Introduced in 8.12.0

- do:
connector.put:
connector_id: test-connector
body:
index_name: search-1-test
name: my-connector
language: pl
is_native: false
service_type: super-connector
---
"Update Connector Last Sync Stats":
- do:
connector.last_sync:
connector_id: test-connector
body:
last_sync_error: "oh no error"
last_access_control_sync_scheduled_at: "2023-05-25T12:30:00.000Z"

- match: { result: updated }

- do:
connector.get:
connector_id: test-connector

- match: { last_sync_error: "oh no error" }
- match: { last_access_control_sync_scheduled_at: "2023-05-25T12:30:00.000Z" }

---
"Update Connector Last Sync Stats - Connector doesn't exist":
- do:
catch: "missing"
connector.last_sync:
connector_id: test-non-existent-connector
body:
last_sync_error: "oh no error"
last_access_control_sync_scheduled_at: "2023-05-25T12:30:00.000Z"

---
"Update Connector Filtering - Wrong datetime expression":
- do:
catch: "bad_request"
connector.last_sync:
connector_id: test-connector
body:
last_access_control_sync_scheduled_at: "this is not a timestamp"


---
"Update Connector Filtering - Wrong status":
- do:
catch: "bad_request"
connector.last_sync:
connector_id: test-connector
body:
last_sync_status: "this is not a valid status"


Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.xpack.application.connector.action.RestPutConnectorAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorFilteringAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorLastSeenAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorLastSyncStatsAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.RestUpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.TransportDeleteConnectorAction;
Expand All @@ -60,10 +61,12 @@
import org.elasticsearch.xpack.application.connector.action.TransportPutConnectorAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorFilteringAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorLastSeenAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorLastSyncStatsAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.TransportUpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSeenAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSyncStatsAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;
import org.elasticsearch.xpack.application.connector.syncjob.action.CancelConnectorSyncJobAction;
Expand Down Expand Up @@ -200,6 +203,7 @@ protected XPackLicenseState getLicenseState() {
new ActionHandler<>(PutConnectorAction.INSTANCE, TransportPutConnectorAction.class),
new ActionHandler<>(UpdateConnectorFilteringAction.INSTANCE, TransportUpdateConnectorFilteringAction.class),
new ActionHandler<>(UpdateConnectorLastSeenAction.INSTANCE, TransportUpdateConnectorLastSeenAction.class),
new ActionHandler<>(UpdateConnectorLastSyncStatsAction.INSTANCE, TransportUpdateConnectorLastSyncStatsAction.class),
new ActionHandler<>(UpdateConnectorPipelineAction.INSTANCE, TransportUpdateConnectorPipelineAction.class),
new ActionHandler<>(UpdateConnectorSchedulingAction.INSTANCE, TransportUpdateConnectorSchedulingAction.class),

Expand Down Expand Up @@ -265,6 +269,7 @@ public List<RestHandler> getRestHandlers(
new RestPutConnectorAction(),
new RestUpdateConnectorFilteringAction(),
new RestUpdateConnectorLastSeenAction(),
new RestUpdateConnectorLastSyncStatsAction(),
new RestUpdateConnectorPipelineAction(),
new RestUpdateConnectorSchedulingAction(),

Expand Down
Expand Up @@ -287,45 +287,45 @@ public Connector(StreamInput in) throws IOException {
ObjectParser.ValueType.STRING_OR_NULL
);

PARSER.declareString(optionalConstructorArg(), ConnectorSyncInfo.LAST_ACCESS_CONTROL_SYNC_ERROR);
PARSER.declareStringOrNull(optionalConstructorArg(), ConnectorSyncInfo.LAST_ACCESS_CONTROL_SYNC_ERROR);
PARSER.declareField(
optionalConstructorArg(),
(p, c) -> Instant.parse(p.text()),
(p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL ? null : Instant.parse(p.text()),
ConnectorSyncInfo.LAST_ACCESS_CONTROL_SYNC_SCHEDULED_AT_FIELD,
ObjectParser.ValueType.STRING
ObjectParser.ValueType.STRING_OR_NULL
);
PARSER.declareField(
optionalConstructorArg(),
(p, c) -> ConnectorSyncStatus.connectorSyncStatus(p.text()),
(p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL ? null : ConnectorSyncStatus.connectorSyncStatus(p.text()),
ConnectorSyncInfo.LAST_ACCESS_CONTROL_SYNC_STATUS_FIELD,
ObjectParser.ValueType.STRING
ObjectParser.ValueType.STRING_OR_NULL
);
PARSER.declareLong(optionalConstructorArg(), ConnectorSyncInfo.LAST_DELETED_DOCUMENT_COUNT_FIELD);
PARSER.declareField(
optionalConstructorArg(),
(p, c) -> Instant.parse(p.text()),
(p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL ? null : Instant.parse(p.text()),
ConnectorSyncInfo.LAST_INCREMENTAL_SYNC_SCHEDULED_AT_FIELD,
ObjectParser.ValueType.STRING
ObjectParser.ValueType.STRING_OR_NULL
);
PARSER.declareLong(optionalConstructorArg(), ConnectorSyncInfo.LAST_INDEXED_DOCUMENT_COUNT_FIELD);
PARSER.declareString(optionalConstructorArg(), ConnectorSyncInfo.LAST_SYNC_ERROR_FIELD);
PARSER.declareStringOrNull(optionalConstructorArg(), ConnectorSyncInfo.LAST_SYNC_ERROR_FIELD);
PARSER.declareField(
optionalConstructorArg(),
(p, c) -> Instant.parse(p.text()),
(p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL ? null : Instant.parse(p.text()),
ConnectorSyncInfo.LAST_SYNC_SCHEDULED_AT_FIELD,
ObjectParser.ValueType.STRING
ObjectParser.ValueType.STRING_OR_NULL
);
PARSER.declareField(
optionalConstructorArg(),
(p, c) -> ConnectorSyncStatus.connectorSyncStatus(p.text()),
(p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL ? null : ConnectorSyncStatus.connectorSyncStatus(p.text()),
ConnectorSyncInfo.LAST_SYNC_STATUS_FIELD,
ObjectParser.ValueType.STRING
ObjectParser.ValueType.STRING_OR_NULL
);
PARSER.declareField(
optionalConstructorArg(),
(p, c) -> Instant.parse(p.text()),
(p, c) -> p.currentToken() == XContentParser.Token.VALUE_NULL ? null : Instant.parse(p.text()),
ConnectorSyncInfo.LAST_SYNCED_FIELD,
ObjectParser.ValueType.STRING
ObjectParser.ValueType.STRING_OR_NULL
);

PARSER.declareString(optionalConstructorArg(), NAME_FIELD);
Expand Down Expand Up @@ -485,6 +485,10 @@ public Map<String, Object> getConfiguration() {
return configuration;
}

public ConnectorSyncInfo getSyncInfo() {
return syncInfo;
}

public Instant getLastSeen() {
return lastSeen;
}
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.xcontent.XContentType;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorFilteringAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSeenAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorLastSyncStatsAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorPipelineAction;
import org.elasticsearch.xpack.application.connector.action.UpdateConnectorSchedulingAction;

Expand Down Expand Up @@ -203,12 +204,42 @@ public void updateConnectorFiltering(UpdateConnectorFilteringAction.Request requ
}

/**
* Updates the {@link ConnectorIngestPipeline} property of a {@link Connector}.
* Updates the lastSeen property of a {@link Connector}.
*
* @param request Request for updating connector ingest pipeline property.
* @param request The request for updating the connector's lastSeen status.
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorLastSeen(UpdateConnectorLastSeenAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
new IndexRequest(CONNECTOR_INDEX_NAME).opType(DocWriteRequest.OpType.INDEX)
.id(connectorId)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(request.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS))
);
clientWithOrigin.update(
updateRequest,
new DelegatingIndexNotFoundActionListener<>(connectorId, listener, (l, updateResponse) -> {
if (updateResponse.getResult() == UpdateResponse.Result.NOT_FOUND) {
l.onFailure(new ResourceNotFoundException(connectorId));
return;
}
l.onResponse(updateResponse);
})
);
} catch (Exception e) {
listener.onFailure(e);
}
}

/**
* Updates the {@link ConnectorSyncInfo} properties in a {@link Connector}.
*
* @param request Request for updating connector last sync stats properties.
* @param listener Listener to respond to a successful response or an error.
*/
public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request request, ActionListener<UpdateResponse> listener) {
public void updateConnectorLastSyncStats(UpdateConnectorLastSyncStatsAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
Expand All @@ -233,12 +264,12 @@ public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request reques
}

/**
* Updates the {@link ConnectorScheduling} property of a {@link Connector}.
* Updates the {@link ConnectorIngestPipeline} property of a {@link Connector}.
*
* @param request The request for updating the connector's scheduling.
* @param listener The listener for handling responses, including successful updates or errors.
* @param request Request for updating connector ingest pipeline property.
* @param listener Listener to respond to a successful response or an error.
*/
public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request request, ActionListener<UpdateResponse> listener) {
public void updateConnectorPipeline(UpdateConnectorPipelineAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
Expand All @@ -263,12 +294,12 @@ public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request re
}

/**
* Updates the lastSeen property of a {@link Connector}.
* Updates the {@link ConnectorScheduling} property of a {@link Connector}.
*
* @param request The request for updating the connector's lastSeen status.
* @param request The request for updating the connector's scheduling.
* @param listener The listener for handling responses, including successful updates or errors.
*/
public void updateConnectorLastSeen(UpdateConnectorLastSeenAction.Request request, ActionListener<UpdateResponse> listener) {
public void updateConnectorScheduling(UpdateConnectorSchedulingAction.Request request, ActionListener<UpdateResponse> listener) {
try {
String connectorId = request.getConnectorId();
final UpdateRequest updateRequest = new UpdateRequest(CONNECTOR_INDEX_NAME, connectorId).doc(
Expand Down
Expand Up @@ -90,51 +90,33 @@ public ConnectorSyncInfo(StreamInput in) throws IOException {
this.lastSynced = in.readOptionalInstant();
}

static final ParseField LAST_ACCESS_CONTROL_SYNC_ERROR = new ParseField("last_access_control_sync_error");
static final ParseField LAST_ACCESS_CONTROL_SYNC_STATUS_FIELD = new ParseField("last_access_control_sync_status");
static final ParseField LAST_ACCESS_CONTROL_SYNC_SCHEDULED_AT_FIELD = new ParseField("last_access_control_sync_scheduled_at");
static final ParseField LAST_DELETED_DOCUMENT_COUNT_FIELD = new ParseField("last_deleted_document_count");
static final ParseField LAST_INCREMENTAL_SYNC_SCHEDULED_AT_FIELD = new ParseField("last_incremental_sync_scheduled_at");
static final ParseField LAST_INDEXED_DOCUMENT_COUNT_FIELD = new ParseField("last_indexed_document_count");
static final ParseField LAST_SYNC_ERROR_FIELD = new ParseField("last_sync_error");
static final ParseField LAST_SYNC_SCHEDULED_AT_FIELD = new ParseField("last_sync_scheduled_at");
static final ParseField LAST_SYNC_STATUS_FIELD = new ParseField("last_sync_status");
static final ParseField LAST_SYNCED_FIELD = new ParseField("last_synced");
public static final ParseField LAST_ACCESS_CONTROL_SYNC_ERROR = new ParseField("last_access_control_sync_error");
public static final ParseField LAST_ACCESS_CONTROL_SYNC_STATUS_FIELD = new ParseField("last_access_control_sync_status");
public static final ParseField LAST_ACCESS_CONTROL_SYNC_SCHEDULED_AT_FIELD = new ParseField("last_access_control_sync_scheduled_at");
public static final ParseField LAST_DELETED_DOCUMENT_COUNT_FIELD = new ParseField("last_deleted_document_count");
public static final ParseField LAST_INCREMENTAL_SYNC_SCHEDULED_AT_FIELD = new ParseField("last_incremental_sync_scheduled_at");
public static final ParseField LAST_INDEXED_DOCUMENT_COUNT_FIELD = new ParseField("last_indexed_document_count");
public static final ParseField LAST_SYNC_ERROR_FIELD = new ParseField("last_sync_error");
public static final ParseField LAST_SYNC_SCHEDULED_AT_FIELD = new ParseField("last_sync_scheduled_at");
public static final ParseField LAST_SYNC_STATUS_FIELD = new ParseField("last_sync_status");
public static final ParseField LAST_SYNCED_FIELD = new ParseField("last_synced");

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

if (lastAccessControlSyncError != null) {
builder.field(LAST_ACCESS_CONTROL_SYNC_ERROR.getPreferredName(), lastAccessControlSyncError);
}
if (lastAccessControlSyncStatus != null) {
builder.field(LAST_ACCESS_CONTROL_SYNC_STATUS_FIELD.getPreferredName(), lastAccessControlSyncStatus);
}
if (lastAccessControlSyncScheduledAt != null) {
builder.field(LAST_ACCESS_CONTROL_SYNC_SCHEDULED_AT_FIELD.getPreferredName(), lastAccessControlSyncScheduledAt);
}
builder.field(LAST_ACCESS_CONTROL_SYNC_ERROR.getPreferredName(), lastAccessControlSyncError);
builder.field(LAST_ACCESS_CONTROL_SYNC_STATUS_FIELD.getPreferredName(), lastAccessControlSyncStatus);
builder.field(LAST_ACCESS_CONTROL_SYNC_SCHEDULED_AT_FIELD.getPreferredName(), lastAccessControlSyncScheduledAt);
if (lastDeletedDocumentCount != null) {
builder.field(LAST_DELETED_DOCUMENT_COUNT_FIELD.getPreferredName(), lastDeletedDocumentCount);
}
if (lastIncrementalSyncScheduledAt != null) {
builder.field(LAST_INCREMENTAL_SYNC_SCHEDULED_AT_FIELD.getPreferredName(), lastIncrementalSyncScheduledAt);
}
builder.field(LAST_INCREMENTAL_SYNC_SCHEDULED_AT_FIELD.getPreferredName(), lastIncrementalSyncScheduledAt);
if (lastIndexedDocumentCount != null) {
builder.field(LAST_INDEXED_DOCUMENT_COUNT_FIELD.getPreferredName(), lastIndexedDocumentCount);
}
if (lastSyncError != null) {
builder.field(LAST_SYNC_ERROR_FIELD.getPreferredName(), lastSyncError);
}
if (lastSyncScheduledAt != null) {
builder.field(LAST_SYNC_SCHEDULED_AT_FIELD.getPreferredName(), lastSyncScheduledAt);
}
if (lastSyncStatus != null) {
builder.field(LAST_SYNC_STATUS_FIELD.getPreferredName(), lastSyncStatus);
}
if (lastSynced != null) {
builder.field(LAST_SYNCED_FIELD.getPreferredName(), lastSynced);
}

builder.field(LAST_SYNC_ERROR_FIELD.getPreferredName(), lastSyncError);
builder.field(LAST_SYNC_SCHEDULED_AT_FIELD.getPreferredName(), lastSyncScheduledAt);
builder.field(LAST_SYNC_STATUS_FIELD.getPreferredName(), lastSyncStatus);
builder.field(LAST_SYNCED_FIELD.getPreferredName(), lastSynced);
return builder;
}

Expand Down

0 comments on commit 759280c

Please sign in to comment.