Skip to content

Commit

Permalink
[Connectors] Use Connector API to cancel all syncs (#183924)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedrazb committed May 27, 2024
1 parent 2b4a2ea commit a4f468b
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 126 deletions.
93 changes: 33 additions & 60 deletions packages/kbn-search-connectors/lib/cancel_syncs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,84 +8,57 @@

import { ElasticsearchClient } from '@kbn/core/server';

import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '..';
import { SyncStatus } from '../types/connectors';

import { cancelSyncs } from './cancel_syncs';
import { fetchSyncJobs } from './fetch_sync_jobs';

jest.mock('./fetch_sync_jobs', () => ({
fetchSyncJobs: jest.fn(),
}));

describe('cancelSync lib function', () => {
const mockClient = {
update: jest.fn(),
updateByQuery: jest.fn(),
transport: {
request: jest.fn(),
},
};

beforeEach(() => {
jest.clearAllMocks();
const now = new Date('2022-05-22T10:10:11.111Z');
jest.spyOn(Date, 'now').mockImplementation(() => now.getTime());
});

it('should call updateByQuery to cancel syncs', async () => {
mockClient.updateByQuery.mockImplementation(() => ({ _id: 'fakeId' }));
it('should call /_cancel endpoint to cancel syncs', async () => {
(fetchSyncJobs as jest.Mock)
.mockResolvedValueOnce({
data: [{ id: 'job_1' }, { id: 'job_2' }],
})
.mockResolvedValueOnce({ data: [] })
.mockResolvedValueOnce({ data: [{ id: 'job_3' }] });

await expect(
cancelSyncs(mockClient as unknown as ElasticsearchClient, 'connectorId')
).resolves.toEqual(undefined);
expect(mockClient.updateByQuery).toHaveBeenCalledTimes(2);
expect(mockClient.updateByQuery).toHaveBeenCalledWith({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
must: [
{
term: {
'connector.id': 'connectorId',
},
},
{
terms: {
status: [SyncStatus.PENDING, SyncStatus.SUSPENDED],
},
},
],
},
},
script: {
lang: 'painless',
source: `ctx._source['status'] = '${SyncStatus.CANCELED}';
ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}';
ctx._source['canceled_at'] = '${new Date(Date.now()).toISOString()}';
ctx._source['completed_at'] = '${new Date(Date.now()).toISOString()}';`,
},
expect(fetchSyncJobs).toHaveBeenCalledTimes(3);
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'PUT',
path: '/_connector/_sync_job/job_1/_cancel',
});
expect(mockClient.updateByQuery).toHaveBeenCalledWith({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
must: [
{
term: {
'connector.id': 'connectorId',
},
},
{
terms: {
status: [SyncStatus.IN_PROGRESS],
},
},
],
},
},
script: {
lang: 'painless',
source: `ctx._source['status'] = '${SyncStatus.CANCELING}';
ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}';`,
},
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'PUT',
path: '/_connector/_sync_job/job_2/_cancel',
});
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'PUT',
path: '/_connector/_sync_job/job_3/_cancel',
});
await expect(mockClient.update).toHaveBeenCalledWith({
doc: { last_sync_status: SyncStatus.CANCELED, sync_now: false },
id: 'connectorId',
index: CONNECTORS_INDEX,
await expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'PUT',
path: '/_connector/connectorId/_last_sync',
body: {
last_access_control_sync_status: SyncStatus.CANCELED,
last_sync_status: SyncStatus.CANCELED,
},
});
});
});
82 changes: 19 additions & 63 deletions packages/kbn-search-connectors/lib/cancel_syncs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,75 +7,31 @@
*/

import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { asyncForEach } from '@kbn/std';

import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '..';
import { fetchSyncJobs, cancelSync } from '..';
import { SyncStatus } from '../types/connectors';
import { isIndexNotFoundException } from '../utils/identify_exceptions';

export const cancelSyncs = async (
client: ElasticsearchClient,
connectorId: string
): Promise<void> => {
try {
await client.updateByQuery({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
must: [
{
term: {
'connector.id': connectorId,
},
},
{
terms: {
status: [SyncStatus.PENDING, SyncStatus.SUSPENDED],
},
},
],
},
},
script: {
lang: 'painless',
source: `ctx._source['status'] = '${SyncStatus.CANCELED}';
ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}';
ctx._source['canceled_at'] = '${new Date(Date.now()).toISOString()}';
ctx._source['completed_at'] = '${new Date(Date.now()).toISOString()}';`,
},
});
await client.updateByQuery({
index: CONNECTORS_JOBS_INDEX,
query: {
bool: {
must: [
{
term: {
'connector.id': connectorId,
},
},
{
terms: {
status: [SyncStatus.IN_PROGRESS],
},
},
],
},
},
script: {
lang: 'painless',
source: `ctx._source['status'] = '${SyncStatus.CANCELING}';
ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}';`,
},
});
await client.update({
doc: { last_sync_status: SyncStatus.CANCELED, sync_now: false },
id: connectorId,
index: CONNECTORS_INDEX,
});
} catch (error) {
if (isIndexNotFoundException(error)) {
return;
await asyncForEach(
[SyncStatus.PENDING, SyncStatus.IN_PROGRESS, SyncStatus.SUSPENDED],
async (status) => {
const syncJobsToCancel = await fetchSyncJobs(client, connectorId, 0, 1000, 'all', status);
await asyncForEach(syncJobsToCancel.data, async (syncJob) => {
await cancelSync(client, syncJob.id);
});
}
throw error;
}
);

return await client.transport.request({
method: 'PUT',
path: `/_connector/${connectorId}/_last_sync`,
body: {
last_access_control_sync_status: SyncStatus.CANCELED,
last_sync_status: SyncStatus.CANCELED,
},
});
};
26 changes: 26 additions & 0 deletions packages/kbn-search-connectors/lib/fetch_sync_jobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* Side Public License, v 1.
*/

import { SyncStatus } from '../types';
import { fetchSyncJobs } from './fetch_sync_jobs';

describe('fetchSyncJobs lib', () => {
Expand Down Expand Up @@ -41,5 +42,30 @@ describe('fetchSyncJobs lib', () => {
querystring: 'from=0&size=10&connector_id=id&job_type=full,incremental',
});
});

it('should fetch sync jobs by status', async () => {
mockClient.transport.request.mockImplementationOnce(() => ({
count: 22,
results: [],
}));
await expect(
fetchSyncJobs(mockClient as any, 'id', 0, 10, 'content', SyncStatus.IN_PROGRESS)
).resolves.toEqual({
_meta: {
page: {
from: 0,
has_more_hits_than_total: true,
size: 10,
total: 22,
},
},
data: [],
});
expect(mockClient.transport.request).toHaveBeenCalledWith({
method: 'GET',
path: '/_connector/_sync_job',
querystring: 'from=0&size=10&connector_id=id&job_type=full,incremental&status=in_progress',
});
});
});
});
7 changes: 4 additions & 3 deletions packages/kbn-search-connectors/lib/fetch_sync_jobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,22 @@
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';

import { ConnectorsAPISyncJobResponse } from '..';
import { ConnectorSyncJob } from '../types/connectors';
import { ConnectorSyncJob, SyncStatus } from '../types/connectors';
import { Paginate } from '../types/pagination';

export const fetchSyncJobs = async (
client: ElasticsearchClient,
connectorId?: string,
from: number = 0,
size: number = 100,
syncJobType: 'content' | 'access_control' | 'all' = 'all'
syncJobType: 'content' | 'access_control' | 'all' = 'all',
syncStatus?: SyncStatus
): Promise<Paginate<ConnectorSyncJob>> => {
const querystring = `from=${from}&size=${size}${
connectorId ? '&connector_id=' + connectorId : ''
}${syncJobType === 'content' ? '&job_type=full,incremental' : ''}${
syncJobType === 'access_control' ? '&job_type=access_control' : ''
}`;
}${syncStatus ? '&status=' + syncStatus : ''}`;
const result = await client.transport.request<ConnectorsAPISyncJobResponse>({
method: 'GET',
path: `/_connector/_sync_job`,
Expand Down
1 change: 1 addition & 0 deletions packages/kbn-search-connectors/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
"@kbn/config-schema",
"@kbn/i18n-react",
"@kbn/test-jest-helpers",
"@kbn/std",
]
}

0 comments on commit a4f468b

Please sign in to comment.