From a4f468b97b7981170825c37371676be2696c7f68 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Mon, 27 May 2024 10:51:56 +0200 Subject: [PATCH] [Connectors] Use Connector API to cancel all syncs (#183924) --- .../lib/cancel_syncs.test.ts | 93 +++++++------------ .../kbn-search-connectors/lib/cancel_syncs.ts | 82 ++++------------ .../lib/fetch_sync_jobs.test.ts | 26 ++++++ .../lib/fetch_sync_jobs.ts | 7 +- packages/kbn-search-connectors/tsconfig.json | 1 + 5 files changed, 83 insertions(+), 126 deletions(-) diff --git a/packages/kbn-search-connectors/lib/cancel_syncs.test.ts b/packages/kbn-search-connectors/lib/cancel_syncs.test.ts index 05c2f5fcd529d8..0fd769099a1555 100644 --- a/packages/kbn-search-connectors/lib/cancel_syncs.test.ts +++ b/packages/kbn-search-connectors/lib/cancel_syncs.test.ts @@ -8,84 +8,57 @@ import { ElasticsearchClient } from '@kbn/core/server'; -import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '..'; import { SyncStatus } from '../types/connectors'; import { cancelSyncs } from './cancel_syncs'; +import { fetchSyncJobs } from './fetch_sync_jobs'; + +jest.mock('./fetch_sync_jobs', () => ({ + fetchSyncJobs: jest.fn(), +})); describe('cancelSync lib function', () => { const mockClient = { - update: jest.fn(), - updateByQuery: jest.fn(), + transport: { + request: jest.fn(), + }, }; beforeEach(() => { jest.clearAllMocks(); - const now = new Date('2022-05-22T10:10:11.111Z'); - jest.spyOn(Date, 'now').mockImplementation(() => now.getTime()); }); - it('should call updateByQuery to cancel syncs', async () => { - mockClient.updateByQuery.mockImplementation(() => ({ _id: 'fakeId' })); + it('should call /_cancel endpoint to cancel syncs', async () => { + (fetchSyncJobs as jest.Mock) + .mockResolvedValueOnce({ + data: [{ id: 'job_1' }, { id: 'job_2' }], + }) + .mockResolvedValueOnce({ data: [] }) + .mockResolvedValueOnce({ data: [{ id: 'job_3' }] }); await expect( cancelSyncs(mockClient as unknown as ElasticsearchClient, 'connectorId') ).resolves.toEqual(undefined); - expect(mockClient.updateByQuery).toHaveBeenCalledTimes(2); - expect(mockClient.updateByQuery).toHaveBeenCalledWith({ - index: CONNECTORS_JOBS_INDEX, - query: { - bool: { - must: [ - { - term: { - 'connector.id': 'connectorId', - }, - }, - { - terms: { - status: [SyncStatus.PENDING, SyncStatus.SUSPENDED], - }, - }, - ], - }, - }, - script: { - lang: 'painless', - source: `ctx._source['status'] = '${SyncStatus.CANCELED}'; -ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}'; -ctx._source['canceled_at'] = '${new Date(Date.now()).toISOString()}'; -ctx._source['completed_at'] = '${new Date(Date.now()).toISOString()}';`, - }, + expect(fetchSyncJobs).toHaveBeenCalledTimes(3); + expect(mockClient.transport.request).toHaveBeenCalledWith({ + method: 'PUT', + path: '/_connector/_sync_job/job_1/_cancel', }); - expect(mockClient.updateByQuery).toHaveBeenCalledWith({ - index: CONNECTORS_JOBS_INDEX, - query: { - bool: { - must: [ - { - term: { - 'connector.id': 'connectorId', - }, - }, - { - terms: { - status: [SyncStatus.IN_PROGRESS], - }, - }, - ], - }, - }, - script: { - lang: 'painless', - source: `ctx._source['status'] = '${SyncStatus.CANCELING}'; -ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}';`, - }, + expect(mockClient.transport.request).toHaveBeenCalledWith({ + method: 'PUT', + path: '/_connector/_sync_job/job_2/_cancel', + }); + expect(mockClient.transport.request).toHaveBeenCalledWith({ + method: 'PUT', + path: '/_connector/_sync_job/job_3/_cancel', }); - await expect(mockClient.update).toHaveBeenCalledWith({ - doc: { last_sync_status: SyncStatus.CANCELED, sync_now: false }, - id: 'connectorId', - index: CONNECTORS_INDEX, + await expect(mockClient.transport.request).toHaveBeenCalledWith({ + method: 'PUT', + path: '/_connector/connectorId/_last_sync', + body: { + last_access_control_sync_status: SyncStatus.CANCELED, + last_sync_status: SyncStatus.CANCELED, + }, }); }); }); diff --git a/packages/kbn-search-connectors/lib/cancel_syncs.ts b/packages/kbn-search-connectors/lib/cancel_syncs.ts index c706dd7d311e3c..df66ceee5864e3 100644 --- a/packages/kbn-search-connectors/lib/cancel_syncs.ts +++ b/packages/kbn-search-connectors/lib/cancel_syncs.ts @@ -7,75 +7,31 @@ */ import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { asyncForEach } from '@kbn/std'; -import { CONNECTORS_INDEX, CONNECTORS_JOBS_INDEX } from '..'; +import { fetchSyncJobs, cancelSync } from '..'; import { SyncStatus } from '../types/connectors'; -import { isIndexNotFoundException } from '../utils/identify_exceptions'; export const cancelSyncs = async ( client: ElasticsearchClient, connectorId: string ): Promise => { - try { - await client.updateByQuery({ - index: CONNECTORS_JOBS_INDEX, - query: { - bool: { - must: [ - { - term: { - 'connector.id': connectorId, - }, - }, - { - terms: { - status: [SyncStatus.PENDING, SyncStatus.SUSPENDED], - }, - }, - ], - }, - }, - script: { - lang: 'painless', - source: `ctx._source['status'] = '${SyncStatus.CANCELED}'; -ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}'; -ctx._source['canceled_at'] = '${new Date(Date.now()).toISOString()}'; -ctx._source['completed_at'] = '${new Date(Date.now()).toISOString()}';`, - }, - }); - await client.updateByQuery({ - index: CONNECTORS_JOBS_INDEX, - query: { - bool: { - must: [ - { - term: { - 'connector.id': connectorId, - }, - }, - { - terms: { - status: [SyncStatus.IN_PROGRESS], - }, - }, - ], - }, - }, - script: { - lang: 'painless', - source: `ctx._source['status'] = '${SyncStatus.CANCELING}'; -ctx._source['cancelation_requested_at'] = '${new Date(Date.now()).toISOString()}';`, - }, - }); - await client.update({ - doc: { last_sync_status: SyncStatus.CANCELED, sync_now: false }, - id: connectorId, - index: CONNECTORS_INDEX, - }); - } catch (error) { - if (isIndexNotFoundException(error)) { - return; + await asyncForEach( + [SyncStatus.PENDING, SyncStatus.IN_PROGRESS, SyncStatus.SUSPENDED], + async (status) => { + const syncJobsToCancel = await fetchSyncJobs(client, connectorId, 0, 1000, 'all', status); + await asyncForEach(syncJobsToCancel.data, async (syncJob) => { + await cancelSync(client, syncJob.id); + }); } - throw error; - } + ); + + return await client.transport.request({ + method: 'PUT', + path: `/_connector/${connectorId}/_last_sync`, + body: { + last_access_control_sync_status: SyncStatus.CANCELED, + last_sync_status: SyncStatus.CANCELED, + }, + }); }; diff --git a/packages/kbn-search-connectors/lib/fetch_sync_jobs.test.ts b/packages/kbn-search-connectors/lib/fetch_sync_jobs.test.ts index c22b2ab19eef9b..248c163f5dadbf 100644 --- a/packages/kbn-search-connectors/lib/fetch_sync_jobs.test.ts +++ b/packages/kbn-search-connectors/lib/fetch_sync_jobs.test.ts @@ -6,6 +6,7 @@ * Side Public License, v 1. */ +import { SyncStatus } from '../types'; import { fetchSyncJobs } from './fetch_sync_jobs'; describe('fetchSyncJobs lib', () => { @@ -41,5 +42,30 @@ describe('fetchSyncJobs lib', () => { querystring: 'from=0&size=10&connector_id=id&job_type=full,incremental', }); }); + + it('should fetch sync jobs by status', async () => { + mockClient.transport.request.mockImplementationOnce(() => ({ + count: 22, + results: [], + })); + await expect( + fetchSyncJobs(mockClient as any, 'id', 0, 10, 'content', SyncStatus.IN_PROGRESS) + ).resolves.toEqual({ + _meta: { + page: { + from: 0, + has_more_hits_than_total: true, + size: 10, + total: 22, + }, + }, + data: [], + }); + expect(mockClient.transport.request).toHaveBeenCalledWith({ + method: 'GET', + path: '/_connector/_sync_job', + querystring: 'from=0&size=10&connector_id=id&job_type=full,incremental&status=in_progress', + }); + }); }); }); diff --git a/packages/kbn-search-connectors/lib/fetch_sync_jobs.ts b/packages/kbn-search-connectors/lib/fetch_sync_jobs.ts index d278c97c84f6b6..fec6be0cc7eb2d 100644 --- a/packages/kbn-search-connectors/lib/fetch_sync_jobs.ts +++ b/packages/kbn-search-connectors/lib/fetch_sync_jobs.ts @@ -9,7 +9,7 @@ import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import { ConnectorsAPISyncJobResponse } from '..'; -import { ConnectorSyncJob } from '../types/connectors'; +import { ConnectorSyncJob, SyncStatus } from '../types/connectors'; import { Paginate } from '../types/pagination'; export const fetchSyncJobs = async ( @@ -17,13 +17,14 @@ export const fetchSyncJobs = async ( connectorId?: string, from: number = 0, size: number = 100, - syncJobType: 'content' | 'access_control' | 'all' = 'all' + syncJobType: 'content' | 'access_control' | 'all' = 'all', + syncStatus?: SyncStatus ): Promise> => { const querystring = `from=${from}&size=${size}${ connectorId ? '&connector_id=' + connectorId : '' }${syncJobType === 'content' ? '&job_type=full,incremental' : ''}${ syncJobType === 'access_control' ? '&job_type=access_control' : '' - }`; + }${syncStatus ? '&status=' + syncStatus : ''}`; const result = await client.transport.request({ method: 'GET', path: `/_connector/_sync_job`, diff --git a/packages/kbn-search-connectors/tsconfig.json b/packages/kbn-search-connectors/tsconfig.json index eb7decb3d1e00e..cb54e57748e948 100644 --- a/packages/kbn-search-connectors/tsconfig.json +++ b/packages/kbn-search-connectors/tsconfig.json @@ -24,5 +24,6 @@ "@kbn/config-schema", "@kbn/i18n-react", "@kbn/test-jest-helpers", + "@kbn/std", ] }