diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts index 399be22d6b67842..1d492301f45bd8a 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/create_index.ts @@ -12,7 +12,6 @@ import { pipe } from 'fp-ts/lib/pipeable'; import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal'; -import type { AcknowledgeResponse } from '.'; import { catchRetryableEsClientErrors, type RetryableEsClientError, @@ -46,6 +45,9 @@ export interface CreateIndexParams { aliases?: string[]; timeout?: string; } + +export type CreateIndexSuccessResponse = 'create_index_succeeded' | 'index_already_exists'; + /** * Creates an index with the given mappings * @@ -64,11 +66,11 @@ export const createIndex = ({ timeout = DEFAULT_TIMEOUT, }: CreateIndexParams): TaskEither.TaskEither< RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded, - 'create_index_succeeded' + CreateIndexSuccessResponse > => { const createIndexTask: TaskEither.TaskEither< RetryableEsClientError | ClusterShardLimitExceeded, - AcknowledgeResponse + CreateIndexSuccessResponse > = () => { const aliasesObject = aliasArrayToRecord(aliases); @@ -103,31 +105,12 @@ export const createIndex = ({ }, }, }) - .then((res) => { - /** - * - acknowledged=false, we timed out before the cluster state was - * updated on all nodes with the newly created index, but it - * probably will be created sometime soon. - * - shards_acknowledged=false, we timed out before all shards were - * started - * - acknowledged=true, shards_acknowledged=true, index creation complete - */ - return Either.right({ - acknowledged: Boolean(res.acknowledged), - shardsAcknowledged: res.shards_acknowledged, - }); + .then(() => { + return Either.right('create_index_succeeded' as const); }) .catch((error) => { if (error?.body?.error?.type === 'resource_already_exists_exception') { - /** - * If the target index already exists it means a previous create - * operation had already been started. However, we can't be sure - * that all shards were started so return shardsAcknowledged: false - */ - return Either.right({ - acknowledged: true, - shardsAcknowledged: false, - }); + return Either.right('index_already_exists' as const); } else if (isClusterShardLimitExceeded(error?.body?.error)) { return Either.left({ type: 'cluster_shard_limit_exceeded' as const, @@ -143,11 +126,12 @@ export const createIndex = ({ createIndexTask, TaskEither.chain< RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded, - AcknowledgeResponse, - 'create_index_succeeded' + CreateIndexSuccessResponse, + CreateIndexSuccessResponse >((res) => { // Systematicaly wait until the target index has a 'green' status meaning // the primary (and on multi node clusters) the replica has been started + // When the index status is 'green' we know that all shards were started // see https://github.com/elastic/kibana/issues/157968 return pipe( waitForIndexStatus({ @@ -156,10 +140,7 @@ export const createIndex = ({ timeout: DEFAULT_TIMEOUT, status: 'green', }), - TaskEither.map(() => { - /** When the index status is 'green' we know that all shards were started */ - return 'create_index_succeeded'; - }) + TaskEither.map(() => res) ); }) ); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts index dbe61920b31b3e5..5af2471a7f72e85 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/index.ts @@ -106,7 +106,8 @@ export { import type { UnknownDocsFound } from './check_for_unknown_docs'; import type { IncompatibleClusterRoutingAllocation } from './initialize_action'; -import { ClusterShardLimitExceeded } from './create_index'; +import type { ClusterShardLimitExceeded } from './create_index'; +import type { SynchronizationFailed } from './synchronize_migrators'; export type { CheckForUnknownDocsParams, @@ -174,6 +175,7 @@ export interface ActionErrorTypeMap { index_not_yellow_timeout: IndexNotYellowTimeout; cluster_shard_limit_exceeded: ClusterShardLimitExceeded; es_response_too_large: EsResponseTooLargeError; + synchronization_failed: SynchronizationFailed; } /** diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts index a5a8e9c25f92916..596e17a36b98b74 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.test.ts @@ -6,38 +6,36 @@ * Side Public License, v 1. */ import { synchronizeMigrators } from './synchronize_migrators'; -import { type Defer, defer } from '../kibana_migrator_utils'; +import { type WaitGroup, waitGroup as createWaitGroup } from '../kibana_migrator_utils'; describe('synchronizeMigrators', () => { - let defers: Array>; - let allDefersPromise: Promise; - let migratorsDefers: Array>; + let waitGroups: Array>; + let allWaitGroupsPromise: Promise; + let migratorsWaitGroups: Array>; beforeEach(() => { jest.clearAllMocks(); - defers = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(defer); - allDefersPromise = Promise.all(defers.map(({ promise }) => promise)); + waitGroups = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(createWaitGroup); + allWaitGroupsPromise = Promise.all(waitGroups.map(({ promise }) => promise)); - migratorsDefers = defers.map(({ resolve, reject }) => ({ + migratorsWaitGroups = waitGroups.map(({ resolve, reject }) => ({ resolve: jest.fn(resolve), reject: jest.fn(reject), - promise: allDefersPromise, + promise: allWaitGroupsPromise, })); }); describe('when all migrators reach the synchronization point with a correct state', () => { it('unblocks all migrators and resolves Right', async () => { - const tasks = migratorsDefers.map((migratorDefer) => synchronizeMigrators(migratorDefer)); + const tasks = migratorsWaitGroups.map((waitGroup) => synchronizeMigrators({ waitGroup })); const res = await Promise.all(tasks.map((task) => task())); - migratorsDefers.forEach((migratorDefer) => - expect(migratorDefer.resolve).toHaveBeenCalledTimes(1) - ); - migratorsDefers.forEach((migratorDefer) => - expect(migratorDefer.reject).not.toHaveBeenCalled() + migratorsWaitGroups.forEach((waitGroup) => + expect(waitGroup.resolve).toHaveBeenCalledTimes(1) ); + migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled()); expect(res).toEqual([ { _tag: 'Right', right: 'synchronized_successfully' }, @@ -48,13 +46,11 @@ describe('synchronizeMigrators', () => { it('migrators are not unblocked until the last one reaches the synchronization point', async () => { let resolved: number = 0; - migratorsDefers.forEach((migratorDefer) => migratorDefer.promise.then(() => ++resolved)); - const [casesDefer, ...otherMigratorsDefers] = migratorsDefers; + migratorsWaitGroups.forEach((waitGroup) => waitGroup.promise.then(() => ++resolved)); + const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups; // we simulate that only kibana_task_manager and kibana migrators get to the sync point - const tasks = otherMigratorsDefers.map((migratorDefer) => - synchronizeMigrators(migratorDefer) - ); + const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup })); // we don't await for them, or we would be locked forever Promise.all(tasks.map((task) => task())); @@ -65,7 +61,7 @@ describe('synchronizeMigrators', () => { expect(resolved).toEqual(0); // finally, the last migrator gets to the synchronization point - await synchronizeMigrators(casesDefer)(); + await synchronizeMigrators({ waitGroup: casesDefer })(); expect(resolved).toEqual(3); }); }); @@ -75,31 +71,29 @@ describe('synchronizeMigrators', () => { it('synchronizedMigrators resolves Left for the rest of migrators', async () => { let resolved: number = 0; let errors: number = 0; - migratorsDefers.forEach((migratorDefer) => - migratorDefer.promise.then(() => ++resolved).catch(() => ++errors) + migratorsWaitGroups.forEach((waitGroup) => + waitGroup.promise.then(() => ++resolved).catch(() => ++errors) ); - const [casesDefer, ...otherMigratorsDefers] = migratorsDefers; + const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups; // we first make one random migrator fail and not reach the sync point casesDefer.reject('Oops. The cases migrator failed unexpectedly.'); // the other migrators then try to synchronize - const tasks = otherMigratorsDefers.map((migratorDefer) => - synchronizeMigrators(migratorDefer) - ); + const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup })); expect(Promise.all(tasks.map((task) => task()))).resolves.toEqual([ { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, @@ -116,15 +110,13 @@ describe('synchronizeMigrators', () => { it('synchronizedMigrators resolves Left for the rest of migrators', async () => { let resolved: number = 0; let errors: number = 0; - migratorsDefers.forEach((migratorDefer) => - migratorDefer.promise.then(() => ++resolved).catch(() => ++errors) + migratorsWaitGroups.forEach((waitGroup) => + waitGroup.promise.then(() => ++resolved).catch(() => ++errors) ); - const [casesDefer, ...otherMigratorsDefers] = migratorsDefers; + const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups; // some migrators try to synchronize - const tasks = otherMigratorsDefers.map((migratorDefer) => - synchronizeMigrators(migratorDefer) - ); + const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup })); // we then make one random migrator fail and not reach the sync point casesDefer.reject('Oops. The cases migrator failed unexpectedly.'); @@ -133,14 +125,14 @@ describe('synchronizeMigrators', () => { { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, { _tag: 'Left', left: { - type: 'sync_failed', + type: 'synchronization_failed', error: 'Oops. The cases migrator failed unexpectedly.', }, }, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts index 26763f1f51ae2f6..50090293ee1e244 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/actions/synchronize_migrators.ts @@ -8,20 +8,33 @@ import * as Either from 'fp-ts/lib/Either'; import * as TaskEither from 'fp-ts/lib/TaskEither'; -import type { Defer } from '../kibana_migrator_utils'; +import type { WaitGroup } from '../kibana_migrator_utils'; -export interface SyncFailed { - type: 'sync_failed'; +/** @internal */ +export interface SynchronizationFailed { + type: 'synchronization_failed'; error: Error; } -export function synchronizeMigrators( - defer: Defer -): TaskEither.TaskEither { +/** @internal */ +export interface SynchronizeMigratorsParams { + waitGroup: WaitGroup; + thenHook?: (res: any) => Either.Right; + payload?: T; +} + +export function synchronizeMigrators({ + waitGroup, + payload, + thenHook = () => + Either.right( + 'synchronized_successfully' as const + ) as Either.Right<'synchronized_successfully'> as unknown as Either.Right, +}: SynchronizeMigratorsParams): TaskEither.TaskEither { return () => { - defer.resolve(); - return defer.promise - .then(() => Either.right('synchronized_successfully' as const)) - .catch((error) => Either.left({ type: 'sync_failed' as const, error })); + waitGroup.resolve(payload); + return waitGroup.promise + .then((res) => (thenHook ? thenHook(res) : res)) + .catch((error) => Either.left({ type: 'synchronization_failed' as const, error })); }; } diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts index 02eb3dcdaadc15d..6921e7392fa1fb8 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.test.ts @@ -17,16 +17,16 @@ import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server'; import { loggerMock } from '@kbn/logging-mocks'; import { calculateTypeStatuses, - createMultiPromiseDefer, + createWaitGroupMap, getCurrentIndexTypesMap, getIndicesInvolvedInRelocation, indexMapToIndexTypesMap, } from './kibana_migrator_utils'; import { INDEX_MAP_BEFORE_SPLIT } from './kibana_migrator_utils.fixtures'; -describe('createMultiPromiseDefer', () => { +describe('createWaitGroupMap', () => { it('creates defer objects with the same Promise', () => { - const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']); + const defers = createWaitGroupMap(['.kibana', '.kibana_cases']); expect(Object.keys(defers)).toHaveLength(2); expect(defers['.kibana'].promise).toEqual(defers['.kibana_cases'].promise); expect(defers['.kibana'].resolve).not.toEqual(defers['.kibana_cases'].resolve); @@ -34,7 +34,7 @@ describe('createMultiPromiseDefer', () => { }); it('the common Promise resolves when all defers resolve', async () => { - const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']); + const defers = createWaitGroupMap(['.kibana', '.kibana_cases']); let resolved = 0; Object.values(defers).forEach((defer) => defer.promise.then(() => ++resolved)); defers['.kibana'].resolve(); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts index 36d17cd1159e9bd..a646f6e36081c23 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/kibana_migrator_utils.ts @@ -15,8 +15,8 @@ import { TypeStatus, type TypeStatusDetails } from './kibana_migrator_constants' // even though this utility class is present in @kbn/kibana-utils-plugin, we can't easily import it from Core // aka. one does not simply reuse code -export class Defer { - public resolve!: (data: T) => void; +class Defer { + public resolve!: (data?: T) => void; public reject!: (error: any) => void; public promise: Promise = new Promise((resolve, reject) => { (this as any).resolve = resolve; @@ -24,12 +24,26 @@ export class Defer { }); } -export const defer = () => new Defer(); +export type WaitGroup = Defer; -export function createMultiPromiseDefer(indices: string[]): Record> { - const defers: Array> = indices.map(defer); - const all = Promise.all(defers.map(({ promise }) => promise)); - return indices.reduce>>((acc, indexName, i) => { +export function waitGroup(): WaitGroup { + return new Defer(); +} + +export function createWaitGroupMap( + keys: string[], + thenHook: (res: T[]) => U = (res) => res as unknown as U +): Record> { + if (!keys?.length) { + return {}; + } + + const defers: Array> = keys.map(() => waitGroup()); + + // every member of the WaitGroup will wait for all members to resolve + const all = Promise.all(defers.map(({ promise }) => promise)).then(thenHook); + + return keys.reduce>>((acc, indexName, i) => { const { resolve, reject } = defers[i]; acc[indexName] = { resolve, reject, promise: all }; return acc; @@ -87,7 +101,7 @@ export async function getIndicesInvolvedInRelocation({ defaultIndexTypesMap: IndexTypesMap; logger: Logger; }): Promise { - const indicesWithMovingTypesSet = new Set(); + const indicesWithRelocatingTypesSet = new Set(); const currentIndexTypesMap = await getCurrentIndexTypesMap({ client, @@ -106,11 +120,11 @@ export async function getIndicesInvolvedInRelocation({ Object.values(typeIndexDistribution) .filter(({ status }) => status === TypeStatus.Moved) .forEach(({ currentIndex, targetIndex }) => { - indicesWithMovingTypesSet.add(currentIndex!); - indicesWithMovingTypesSet.add(targetIndex!); + indicesWithRelocatingTypesSet.add(currentIndex!); + indicesWithRelocatingTypesSet.add(targetIndex!); }); - return Array.from(indicesWithMovingTypesSet); + return Array.from(indicesWithRelocatingTypesSet); } export function indexMapToIndexTypesMap(indexMap: IndexMap): IndexTypesMap { diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts index a30c90e2da5ee6c..d3e8fcabfc432a5 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/helpers.ts @@ -19,6 +19,9 @@ import type { AliasAction, FetchIndexResponse } from '../actions'; import type { BulkIndexOperationTuple } from './create_batches'; import { OutdatedDocumentsSearchRead, ReindexSourceToTempRead } from '../state'; +/** @internal */ +export const REINDEX_TEMP_SUFFIX = '_reindex_temp'; + /** @internal */ export type Aliases = Partial>; @@ -311,7 +314,7 @@ export function getMigrationType({ * @returns A temporary index name to reindex documents */ export const getTempIndexName = (indexPrefix: string, kibanaVersion: string): string => - `${indexPrefix}_${kibanaVersion}_reindex_temp`; + `${indexPrefix}_${kibanaVersion}${REINDEX_TEMP_SUFFIX}`; /** Increase batchSize by 20% until a maximum of maxBatchSize */ export const increaseBatchSize = ( diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts index f96695af4a07f7a..b6e7508770d7d0b 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.test.ts @@ -1791,7 +1791,7 @@ describe('migrations v2 model', () => { test('READY_TO_REINDEX_SYNC -> FATAL if the synchronization between migrators fails', () => { const res: ResponseType<'READY_TO_REINDEX_SYNC'> = Either.left({ - type: 'sync_failed', + type: 'synchronization_failed', error: new Error('Other migrators failed to reach the synchronization point'), }); const newState = model(state, res); @@ -2052,7 +2052,7 @@ describe('migrations v2 model', () => { }); test('DONE_REINDEXING_SYNC -> FATAL if the synchronization between migrators fails', () => { const res: ResponseType<'DONE_REINDEXING_SYNC'> = Either.left({ - type: 'sync_failed', + type: 'synchronization_failed', error: new Error('Other migrators failed to reach the synchronization point'), }); const newState = model(state, res); @@ -2971,10 +2971,10 @@ describe('migrations v2 model', () => { sourceIndex: Option.none as Option.None, targetIndex: '.kibana_7.11.0_001', }; - test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY', () => { + test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS', () => { const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded'); const newState = model(createNewTargetState, res); - expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY'); + expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); @@ -2988,7 +2988,7 @@ describe('migrations v2 model', () => { expect(newState.retryCount).toEqual(1); expect(newState.retryDelay).toEqual(2000); }); - test('CREATE_NEW_TARGET -> MARK_VERSION_INDEX_READY resets the retry count and delay', () => { + test('CREATE_NEW_TARGET -> CHECK_VERSION_INDEX_READY_ACTIONS resets the retry count and delay', () => { const res: ResponseType<'CREATE_NEW_TARGET'> = Either.right('create_index_succeeded'); const testState = { ...createNewTargetState, @@ -2997,7 +2997,7 @@ describe('migrations v2 model', () => { }; const newState = model(testState, res); - expect(newState.controlState).toEqual('MARK_VERSION_INDEX_READY'); + expect(newState.controlState).toEqual('CHECK_VERSION_INDEX_READY_ACTIONS'); expect(newState.retryCount).toEqual(0); expect(newState.retryDelay).toEqual(0); }); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts index 915fe58f6e4484e..e4f38c2a32a3c57 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/model/model.ts @@ -46,6 +46,7 @@ import { increaseBatchSize, hasLaterVersionAlias, aliasVersion, + REINDEX_TEMP_SUFFIX, } from './helpers'; import { buildTempIndexMap, createBatches } from './create_batches'; import type { MigrationLog } from '../types'; @@ -86,7 +87,7 @@ export const model = (currentState: State, resW: ResponseType): const retryErrorMessage = `[${left.type}] Incompatible Elasticsearch cluster settings detected. Remove the persistent and transient Elasticsearch cluster setting 'cluster.routing.allocation.enable' or set it to a value of 'all' to allow migrations to proceed. Refer to ${stateP.migrationDocLinks.routingAllocationDisabled} for more information on how to resolve the issue.`; return delayRetryState(stateP, retryErrorMessage, stateP.retryAttempts); } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else if (Either.isRight(res)) { // cluster routing allocation is enabled and we can continue with the migration as normal @@ -266,7 +267,7 @@ export const model = (currentState: State, resW: ResponseType): }; } } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'WAIT_FOR_MIGRATION_COMPLETION') { const res = resW as ExcludeRetryableEsError>; @@ -314,14 +315,14 @@ export const model = (currentState: State, resW: ResponseType): // If the write block failed because the index doesn't exist, it means // another instance already completed the legacy pre-migration. Proceed // to the next step. - if (isTypeof(res.left, 'index_not_found_exception')) { + const left = res.left; + if (isTypeof(left, 'index_not_found_exception')) { return { ...stateP, controlState: 'LEGACY_CREATE_REINDEX_TARGET' }; } else { - // @ts-expect-error TS doesn't correctly narrow this type to never - return throwBadResponse(stateP, res); + throwBadResponse(stateP, left); } } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'LEGACY_CREATE_REINDEX_TARGET') { const res = resW as ExcludeRetryableEsError>; @@ -343,7 +344,7 @@ export const model = (currentState: State, resW: ResponseType): reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`, }; } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else if (Either.isRight(res)) { return { @@ -476,10 +477,10 @@ export const model = (currentState: State, resW: ResponseType): const retryErrorMessage = `${left.message} Refer to ${stateP.migrationDocLinks.repeatedTimeoutRequests} for information on how to resolve the issue.`; return delayRetryState(stateP, retryErrorMessage, stateP.retryAttempts); } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'UPDATE_SOURCE_MAPPINGS_PROPERTIES') { const res = resW as ExcludeRetryableEsError>; @@ -723,13 +724,11 @@ export const model = (currentState: State, resW: ResponseType): ...stateP, controlState: 'CALCULATE_EXCLUDE_FILTERS', }; - } else if (isTypeof(res.left, 'index_not_found_exception')) { + } else { // We don't handle the following errors as the migration algorithm // will never cause them to occur: // - index_not_found_exception - return throwBadResponse(stateP, res.left as never); - } else { - return throwBadResponse(stateP, res.left); + throwBadResponse(stateP, res.left as never); } } else if (stateP.controlState === 'CALCULATE_EXCLUDE_FILTERS') { const res = resW as ExcludeRetryableEsError>; @@ -753,7 +752,7 @@ export const model = (currentState: State, resW: ResponseType): ], }; } else { - return throwBadResponse(stateP, res); + throwBadResponse(stateP, res); } } else if (stateP.controlState === 'CREATE_REINDEX_TEMP') { const res = resW as ExcludeRetryableEsError>; @@ -788,7 +787,7 @@ export const model = (currentState: State, resW: ResponseType): reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`, }; } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else { // If the createIndex action receives an 'resource_already_exists_exception' @@ -813,14 +812,20 @@ export const model = (currentState: State, resW: ResponseType): return { ...stateP, controlState: 'DONE_REINDEXING_SYNC' }; } } else if (Either.isLeft(res)) { - return { - ...stateP, - controlState: 'FATAL', - reason: 'An error occurred whilst waiting for other migrators to get to this step.', - throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem - }; + const left = res.left; + + if (isTypeof(left, 'synchronization_failed')) { + return { + ...stateP, + controlState: 'FATAL', + reason: 'An error occurred whilst waiting for other migrators to get to this step.', + throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem + }; + } else { + throwBadResponse(stateP, left); + } } else { - return throwBadResponse(stateP, res as never); + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_OPEN_PIT') { const res = resW as ExcludeRetryableEsError>; @@ -954,14 +959,20 @@ export const model = (currentState: State, resW: ResponseType): sourceIndexMappings: Option.none, }; } else if (Either.isLeft(res)) { - return { - ...stateP, - controlState: 'FATAL', - reason: 'An error occurred whilst waiting for other migrators to get to this step.', - throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem - }; + const left = res.left; + + if (isTypeof(left, 'synchronization_failed')) { + return { + ...stateP, + controlState: 'FATAL', + reason: 'An error occurred whilst waiting for other migrators to get to this step.', + throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem + }; + } else { + throwBadResponse(stateP, left); + } } else { - return throwBadResponse(stateP, res as never); + throwBadResponse(stateP, res as never); } } else if (stateP.controlState === 'REINDEX_SOURCE_TO_TEMP_TRANSFORM') { // We follow a similar control flow as for @@ -1458,7 +1469,9 @@ export const model = (currentState: State, resW: ResponseType): // index. return { ...stateP, - controlState: 'MARK_VERSION_INDEX_READY', + controlState: stateP.mustRelocateDocuments + ? 'MARK_VERSION_INDEX_READY_SYNC' + : 'MARK_VERSION_INDEX_READY', versionIndexReadyActions: stateP.versionIndexReadyActions, }; } else { @@ -1474,9 +1487,19 @@ export const model = (currentState: State, resW: ResponseType): } else if (stateP.controlState === 'CREATE_NEW_TARGET') { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { + if (res.right === 'index_already_exists') { + // We were supposed to be on a "fresh deployment" state (we did not find any aliases) + // but the target index already exists. Assume it can be from a previous upgrade attempt that: + // - managed to clone ..._reindex_temp into target + // - but did NOT finish the process (aka did not get to update the index aliases) + return { + ...stateP, + controlState: 'OUTDATED_DOCUMENTS_SEARCH_OPEN_PIT', + }; + } return { ...stateP, - controlState: 'MARK_VERSION_INDEX_READY', + controlState: 'CHECK_VERSION_INDEX_READY_ACTIONS', }; } else if (Either.isLeft(res)) { const left = res.left; @@ -1495,7 +1518,7 @@ export const model = (currentState: State, resW: ResponseType): reason: `${CLUSTER_SHARD_LIMIT_EXCEEDED_REASON} See ${stateP.migrationDocLinks.clusterShardLimitExceeded}`, }; } else { - return throwBadResponse(stateP, left); + throwBadResponse(stateP, left); } } else { // If the createIndex action receives an 'resource_already_exists_exception' @@ -1503,7 +1526,10 @@ export const model = (currentState: State, resW: ResponseType): // left responses to handle here. throwBadResponse(stateP, res); } - } else if (stateP.controlState === 'MARK_VERSION_INDEX_READY') { + } else if ( + stateP.controlState === 'MARK_VERSION_INDEX_READY' || + stateP.controlState === 'MARK_VERSION_INDEX_READY_SYNC' + ) { const res = resW as ExcludeRetryableEsError>; if (Either.isRight(res)) { return { ...stateP, controlState: 'DONE' }; @@ -1516,7 +1542,7 @@ export const model = (currentState: State, resW: ResponseType): // migration from the same source. return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' }; } else if (isTypeof(left, 'index_not_found_exception')) { - if (left.index === stateP.tempIndex) { + if (left.index.endsWith(REINDEX_TEMP_SUFFIX)) { // another instance has already completed the migration and deleted // the temporary index return { ...stateP, controlState: 'MARK_VERSION_INDEX_READY_CONFLICT' }; @@ -1531,6 +1557,13 @@ export const model = (currentState: State, resW: ResponseType): // cause it to occur (this error is only relevant to the LEGACY_DELETE // step). throwBadResponse(stateP, left as never); + } else if (isTypeof(left, 'synchronization_failed')) { + return { + ...stateP, + controlState: 'FATAL', + reason: 'An error occurred whilst waiting for other migrators to get to this step.', + throwDelayMillis: 1000, // another migrator has failed for a reason, let it take Kibana down and log its problem + }; } else { throwBadResponse(stateP, left); } @@ -1584,6 +1617,6 @@ export const model = (currentState: State, resW: ResponseType): // The state-action machine will never call the model in the terminating states throwBadControlState(stateP as never); } else { - return throwBadControlState(stateP); + throwBadControlState(stateP); } }; diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts index f7cabfb6e42dbaa..c57cf1f7f07057a 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.test.ts @@ -7,7 +7,7 @@ */ import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import { defer } from './kibana_migrator_utils'; +import { waitGroup } from './kibana_migrator_utils'; import { next } from './next'; import type { State } from './state'; @@ -15,12 +15,24 @@ describe('migrations v2 next', () => { it.todo('when state.retryDelay > 0 delays execution of the next action'); it('DONE returns null', () => { const state = { controlState: 'DONE' } as State; - const action = next({} as ElasticsearchClient, (() => {}) as any, defer(), defer())(state); + const action = next( + {} as ElasticsearchClient, + (() => {}) as any, + waitGroup(), + waitGroup(), + waitGroup() + )(state); expect(action).toEqual(null); }); it('FATAL returns null', () => { const state = { controlState: 'FATAL', reason: '' } as State; - const action = next({} as ElasticsearchClient, (() => {}) as any, defer(), defer())(state); + const action = next( + {} as ElasticsearchClient, + (() => {}) as any, + waitGroup(), + waitGroup(), + waitGroup() + )(state); expect(action).toEqual(null); }); }); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts index 9c028f150a6d0e4..18c6c8409853a9c 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/next.ts @@ -9,7 +9,7 @@ import * as Option from 'fp-ts/lib/Option'; import { omit } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; -import type { Defer } from './kibana_migrator_utils'; +import type { WaitGroup } from './kibana_migrator_utils'; import type { AllActionStates, CalculateExcludeFiltersState, @@ -72,8 +72,9 @@ export type ResponseType = Awaited< export const nextActionMap = ( client: ElasticsearchClient, transformRawDocs: TransformRawDocs, - readyToReindex: Defer, - doneReindexing: Defer + readyToReindex: WaitGroup, + doneReindexing: WaitGroup, + updateRelocationAliases: WaitGroup ) => { return { INIT: (state: InitState) => @@ -142,7 +143,10 @@ export const nextActionMap = ( aliases: [state.tempIndexAlias], mappings: state.tempIndexMappings, }), - READY_TO_REINDEX_SYNC: () => Actions.synchronizeMigrators(readyToReindex), + READY_TO_REINDEX_SYNC: () => + Actions.synchronizeMigrators({ + waitGroup: readyToReindex, + }), REINDEX_SOURCE_TO_TEMP_OPEN_PIT: (state: ReindexSourceToTempOpenPit) => Actions.openPit({ client, index: state.sourceIndex.value }), REINDEX_SOURCE_TO_TEMP_READ: (state: ReindexSourceToTempRead) => @@ -181,7 +185,10 @@ export const nextActionMap = ( */ refresh: false, }), - DONE_REINDEXING_SYNC: () => Actions.synchronizeMigrators(doneReindexing), + DONE_REINDEXING_SYNC: () => + Actions.synchronizeMigrators({ + waitGroup: doneReindexing, + }), SET_TEMP_WRITE_BLOCK: (state: SetTempWriteBlock) => Actions.setWriteBlock({ client, index: state.tempIndex }), CLONE_TEMP_TO_TARGET: (state: CloneTempToTarget) => @@ -249,6 +256,12 @@ export const nextActionMap = ( }), MARK_VERSION_INDEX_READY: (state: MarkVersionIndexReady) => Actions.updateAliases({ client, aliasActions: state.versionIndexReadyActions.value }), + MARK_VERSION_INDEX_READY_SYNC: (state: MarkVersionIndexReady) => + Actions.synchronizeMigrators({ + waitGroup: updateRelocationAliases, + payload: state.versionIndexReadyActions.value, + thenHook: (res) => res, + }), MARK_VERSION_INDEX_READY_CONFLICT: (state: MarkVersionIndexReadyConflict) => Actions.fetchIndices({ client, indices: [state.currentAlias, state.versionAlias] }), LEGACY_SET_WRITE_BLOCK: (state: LegacySetWriteBlockState) => @@ -279,10 +292,17 @@ export const nextActionMap = ( export const next = ( client: ElasticsearchClient, transformRawDocs: TransformRawDocs, - readyToReindex: Defer, - doneReindexing: Defer + readyToReindex: WaitGroup, + doneReindexing: WaitGroup, + updateRelocationAliases: WaitGroup ) => { - const map = nextActionMap(client, transformRawDocs, readyToReindex, doneReindexing); + const map = nextActionMap( + client, + transformRawDocs, + readyToReindex, + doneReindexing, + updateRelocationAliases + ); return (state: State) => { const delay = createDelayFn(state); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts index 280b8fd08f6cf6e..56172384dcdcd85 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.test.ts @@ -13,7 +13,7 @@ import { elasticsearchClientMock } from '@kbn/core-elasticsearch-client-server-m import { loggingSystemMock } from '@kbn/core-logging-server-mocks'; import type { MigrationResult } from '@kbn/core-saved-objects-base-server-internal'; import { createInitialState } from './initial_state'; -import { Defer } from './kibana_migrator_utils'; +import { waitGroup } from './kibana_migrator_utils'; import { migrationStateActionMachine } from './migrations_state_action_machine'; import { next } from './next'; import { runResilientMigrator, type RunResilientMigratorParams } from './run_resilient_migrator'; @@ -128,8 +128,9 @@ const mockOptions = (): RunResilientMigratorParams => { }, }, }, - readyToReindex: new Defer(), - doneReindexing: new Defer(), + readyToReindex: waitGroup(), + doneReindexing: waitGroup(), + updateRelocationAliases: waitGroup(), logger, transformRawDocs: jest.fn(), preMigrationScript: "ctx._id = ctx._source.type + ':' + ctx._id", diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts index 88f6f5757849236..90bf2d2454fdada 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_resilient_migrator.ts @@ -17,7 +17,7 @@ import type { MigrationResult, IndexTypesMap, } from '@kbn/core-saved-objects-base-server-internal'; -import type { Defer } from './kibana_migrator_utils'; +import type { WaitGroup } from './kibana_migrator_utils'; import type { TransformRawDocs } from './types'; import { next } from './next'; import { model } from './model'; @@ -25,6 +25,7 @@ import { createInitialState } from './initial_state'; import { migrationStateActionMachine } from './migrations_state_action_machine'; import { cleanup } from './migrations_state_machine_cleanup'; import type { State } from './state'; +import type { AliasAction } from './actions'; /** * To avoid the Elasticsearch-js client aborting our requests before we @@ -48,8 +49,9 @@ export interface RunResilientMigratorParams { indexTypesMap: IndexTypesMap; targetMappings: IndexMapping; preMigrationScript?: string; - readyToReindex: Defer; - doneReindexing: Defer; + readyToReindex: WaitGroup; + doneReindexing: WaitGroup; + updateRelocationAliases: WaitGroup; logger: Logger; transformRawDocs: TransformRawDocs; coreMigrationVersionPerType: SavedObjectsMigrationVersion; @@ -76,6 +78,7 @@ export async function runResilientMigrator({ preMigrationScript, readyToReindex, doneReindexing, + updateRelocationAliases, transformRawDocs, coreMigrationVersionPerType, migrationVersionPerType, @@ -103,7 +106,13 @@ export async function runResilientMigrator({ return migrationStateActionMachine({ initialState, logger, - next: next(migrationClient, transformRawDocs, readyToReindex, doneReindexing), + next: next( + migrationClient, + transformRawDocs, + readyToReindex, + doneReindexing, + updateRelocationAliases + ), model, abort: async (state?: State) => { // At this point, we could reject this migrator's defers and unblock other migrators diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts index 22d62307aacf89e..a785ff46823e7f2 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.test.ts @@ -22,8 +22,8 @@ import { buildTypesMappings, createIndexMap } from './core'; import { getIndicesInvolvedInRelocation, indexMapToIndexTypesMap, - createMultiPromiseDefer, - Defer, + createWaitGroupMap, + waitGroup, } from './kibana_migrator_utils'; import { runResilientMigrator } from './run_resilient_migrator'; import { indexTypesMapMock, savedObjectTypeRegistryMock } from './run_resilient_migrator.fixtures'; @@ -41,7 +41,7 @@ jest.mock('./kibana_migrator_utils', () => { return { ...actual, indexMapToIndexTypesMap: jest.fn(actual.indexMapToIndexTypesMap), - createMultiPromiseDefer: jest.fn(actual.createMultiPromiseDefer), + createWaitGroupMap: jest.fn(actual.createWaitGroupMap), getIndicesInvolvedInRelocation: jest.fn(() => Promise.resolve(['.my_index', '.other_index'])), }; }); @@ -79,9 +79,7 @@ const mockCreateIndexMap = createIndexMap as jest.MockedFunction; -const mockCreateMultiPromiseDefer = createMultiPromiseDefer as jest.MockedFunction< - typeof createMultiPromiseDefer ->; +const mockCreateWaitGroupMap = createWaitGroupMap as jest.MockedFunction; const mockGetIndicesInvolvedInRelocation = getIndicesInvolvedInRelocation as jest.MockedFunction< typeof getIndicesInvolvedInRelocation >; @@ -93,7 +91,7 @@ describe('runV2Migration', () => { beforeEach(() => { mockCreateIndexMap.mockClear(); mockIndexMapToIndexTypesMap.mockClear(); - mockCreateMultiPromiseDefer.mockClear(); + mockCreateWaitGroupMap.mockClear(); mockGetIndicesInvolvedInRelocation.mockClear(); mockRunResilientMigrator.mockClear(); }); @@ -143,9 +141,14 @@ describe('runV2Migration', () => { const options = mockOptions(); options.documentMigrator.prepareMigrations(); await runV2Migration(options); - expect(createMultiPromiseDefer).toBeCalledTimes(2); - expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']); - expect(createMultiPromiseDefer).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']); + expect(mockCreateWaitGroupMap).toBeCalledTimes(3); + expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(1, ['.my_index', '.other_index']); + expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith(2, ['.my_index', '.other_index']); + expect(mockCreateWaitGroupMap).toHaveBeenNthCalledWith( + 3, + ['.my_index', '.other_index'], + expect.any(Function) // we expect to receive a method to update all aliases in this hook + ); }); it('calls runResilientMigrator for each migrator it must spawn', async () => { @@ -168,6 +171,7 @@ describe('runV2Migration', () => { mustRelocateDocuments: true, readyToReindex: expect.any(Object), doneReindexing: expect.any(Object), + updateRelocationAliases: expect.any(Object), }) ); expect(runResilientMigrator).toHaveBeenNthCalledWith( @@ -178,6 +182,7 @@ describe('runV2Migration', () => { mustRelocateDocuments: true, readyToReindex: expect.any(Object), doneReindexing: expect.any(Object), + updateRelocationAliases: expect.any(Object), }) ); expect(runResilientMigrator).toHaveBeenNthCalledWith( @@ -188,14 +193,15 @@ describe('runV2Migration', () => { mustRelocateDocuments: false, readyToReindex: undefined, doneReindexing: undefined, + updateRelocationAliases: undefined, }) ); }); it('awaits on all runResilientMigrator promises, and resolves with the results of each of them', async () => { - const myIndexMigratorDefer = new Defer(); - const otherIndexMigratorDefer = new Defer(); - const taskIndexMigratorDefer = new Defer(); + const myIndexMigratorDefer = waitGroup(); + const otherIndexMigratorDefer = waitGroup(); + const taskIndexMigratorDefer = waitGroup(); let migrationResults: MigrationResult[] | undefined; mockRunResilientMigrator.mockReturnValueOnce(myIndexMigratorDefer.promise); diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts index c50a3c6997598c7..49088ebc147b8aa 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/run_v2_migration.ts @@ -24,12 +24,14 @@ import Semver from 'semver'; import type { DocumentMigrator } from './document_migrator'; import { buildActiveMappings, createIndexMap } from './core'; import { - createMultiPromiseDefer, + createWaitGroupMap, getIndicesInvolvedInRelocation, indexMapToIndexTypesMap, } from './kibana_migrator_utils'; import { runResilientMigrator } from './run_resilient_migrator'; import { migrateRawDocsSafely } from './core/migrate_raw_docs'; +import type { AliasAction } from './actions/update_aliases'; +import { updateAliases } from './actions'; export interface RunV2MigrationOpts { /** The current Kibana version */ @@ -74,7 +76,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise>( + indicesWithRelocatingTypes, + (allAliasActions) => + updateAliases({ + client: options.elasticsearchClient, + aliasActions: allAliasActions.flat(), + })() + ); // build a list of all migrators that must be started const migratorIndices = new Set(Object.keys(indexMap)); - // indices involved in a relocation might no longer be present in current mappings + // the types in indices involved in relocation might not have mappings in the current mappings anymore // but if their SOs must be relocated to another index, we still need a migrator to do the job - indicesWithMovingTypes.forEach((index) => migratorIndices.add(index)); + indicesWithRelocatingTypes.forEach((index) => migratorIndices.add(index)); const migrators = Array.from(migratorIndices).map((indexName, i) => { return { migrate: (): Promise => { - const readyToReindex = readyToReindexDefers[indexName]; - const doneReindexing = doneReindexingDefers[indexName]; + const readyToReindex = readyToReindexWaitGroupMap[indexName]; + const doneReindexing = doneReindexingWaitGroupMap[indexName]; + const updateRelocationAliases = updateAliasesWaitGroupMap[indexName]; // check if this migrator's index is involved in some document redistribution - const mustRelocateDocuments = !!readyToReindex; + const mustRelocateDocuments = indicesWithRelocatingTypes.includes(indexName); return runResilientMigrator({ client: options.elasticsearchClient, @@ -127,6 +138,7 @@ export const runV2Migration = async (options: RunV2MigrationOpts): Promise migrateRawDocsSafely({ serializer: options.serializer, diff --git a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts index 39b7f71ea741599..5dfc5a793046e19 100644 --- a/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts +++ b/packages/core/saved-objects/core-saved-objects-migration-server-internal/src/state.ts @@ -459,6 +459,14 @@ export interface MarkVersionIndexReady extends PostInitState { readonly versionIndexReadyActions: Option.Some; } +export interface MarkVersionIndexReadySync extends PostInitState { + /** Single "client.indices.updateAliases" operation + * to update multiple indices' aliases simultaneously + * */ + readonly controlState: 'MARK_VERSION_INDEX_READY_SYNC'; + readonly versionIndexReadyActions: Option.Some; +} + export interface MarkVersionIndexReadyConflict extends PostInitState { /** * If the MARK_VERSION_INDEX_READY step fails another instance was @@ -541,6 +549,7 @@ export type State = Readonly< | LegacyReindexWaitForTaskState | LegacySetWriteBlockState | MarkVersionIndexReady + | MarkVersionIndexReadySync | MarkVersionIndexReadyConflict | OutdatedDocumentsRefresh | OutdatedDocumentsSearchClosePit diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts index acd03e9556835b4..f68cbf40ba48926 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/actions/actions.test.ts @@ -1876,7 +1876,7 @@ describe('migration actions', () => { expect(res).toMatchInlineSnapshot(` Object { "_tag": "Right", - "right": "create_index_succeeded", + "right": "index_already_exists", } `); }); diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts index 626a89df410a8fa..a7377ec2050d16b 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/dot_kibana_split.test.ts @@ -12,6 +12,7 @@ import { type ISavedObjectTypeRegistry, type SavedObjectsType, MAIN_SAVED_OBJECT_INDEX, + ALL_SAVED_OBJECT_INDICES, } from '@kbn/core-saved-objects-server'; import { DEFAULT_INDEX_TYPES_MAP } from '@kbn/core-saved-objects-base-server-internal'; import { @@ -376,8 +377,8 @@ describe('split .kibana index into multiple system indices', () => { `[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES -> UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK.`, `[${index}] UPDATE_TARGET_MAPPINGS_PROPERTIES_WAIT_FOR_TASK -> UPDATE_TARGET_MAPPINGS_META.`, `[${index}] UPDATE_TARGET_MAPPINGS_META -> CHECK_VERSION_INDEX_READY_ACTIONS.`, - `[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY.`, - `[${index}] MARK_VERSION_INDEX_READY -> DONE.`, + `[${index}] CHECK_VERSION_INDEX_READY_ACTIONS -> MARK_VERSION_INDEX_READY_SYNC.`, + `[${index}] MARK_VERSION_INDEX_READY_SYNC -> DONE.`, `[${index}] Migration completed after`, ], { ordered: true } @@ -395,7 +396,6 @@ describe('split .kibana index into multiple system indices', () => { const { runMigrations } = await migratorTestKitFactory(); await clearLog(logFilePath); await runMigrations(); - const logs = await parseLogFile(logFilePath); expect(logs).not.toContainLogEntries(['REINDEX', 'CREATE', 'UPDATE_TARGET_MAPPINGS']); }); @@ -407,6 +407,7 @@ describe('split .kibana index into multiple system indices', () => { }); // FLAKY: https://github.com/elastic/kibana/issues/157510 + // This test takes too long. Can be manually executed to verify the correct behavior. describe.skip('when multiple Kibana migrators run in parallel', () => { it('correctly migrates 7.7.2_xpack_100k_obj.zip archive', async () => { esServer = await startElasticsearch({ @@ -415,15 +416,29 @@ describe('split .kibana index into multiple system indices', () => { const esClient = await getEsClient(); const breakdownBefore = await getAggregatedTypesCountAllIndices(esClient); - expect(breakdownBefore).toMatchSnapshot('before migration'); + expect(breakdownBefore).toEqual({ + '.kibana': { + 'apm-telemetry': 1, + config: 1, + dashboard: 52994, + 'index-pattern': 1, + search: 1, + space: 1, + 'ui-metric': 5, + visualization: 53004, + }, + '.kibana_task_manager': { + task: 5, + }, + }); for (let i = 0; i < PARALLEL_MIGRATORS; ++i) { await clearLog(Path.join(__dirname, `dot_kibana_split_instance_${i}.log`)); } const testKits = await Promise.all( - new Array(PARALLEL_MIGRATORS) - .fill({ + new Array(PARALLEL_MIGRATORS).fill(true).map((_, index) => + getKibanaMigratorTestKit({ settings: { migrations: { discardUnknownObjects: currentVersion, @@ -432,13 +447,10 @@ describe('split .kibana index into multiple system indices', () => { }, kibanaIndex: MAIN_SAVED_OBJECT_INDEX, types: typeRegistry.getAllTypes(), + defaultIndexTypesMap: DEFAULT_INDEX_TYPES_MAP, + logFilePath: Path.join(__dirname, `dot_kibana_split_instance_${index}.log`), }) - .map((config, index) => - getKibanaMigratorTestKit({ - ...config, - logFilePath: Path.join(__dirname, `dot_kibana_split_instance_${index}.log`), - }) - ) + ) ); const results = await Promise.all(testKits.map((testKit) => testKit.runMigrations())); @@ -448,9 +460,30 @@ describe('split .kibana index into multiple system indices', () => { .every((result) => result.status === 'migrated' || result.status === 'patched') ).toEqual(true); + await esClient.indices.refresh({ index: ALL_SAVED_OBJECT_INDICES }); + const breakdownAfter = await getAggregatedTypesCountAllIndices(esClient); - expect(breakdownAfter).toMatchSnapshot('after migration'); - }); + expect(breakdownAfter).toEqual({ + '.kibana': { + 'apm-telemetry': 1, + config: 1, + space: 1, + 'ui-metric': 5, + }, + '.kibana_alerting_cases': {}, + '.kibana_analytics': { + dashboard: 52994, + 'index-pattern': 1, + search: 1, + visualization: 53004, + }, + '.kibana_ingest': {}, + '.kibana_security_solution': {}, + '.kibana_task_manager': { + task: 5, + }, + }); + }, 1200000); afterEach(async () => { await esServer?.stop(); diff --git a/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts b/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts index 5ae104007205935..b723ca1b6260878 100644 --- a/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts +++ b/src/core/server/integration_tests/saved_objects/migrations/group3/incompatible_cluster_routing_allocation.test.ts @@ -161,7 +161,7 @@ describe('incompatible_cluster_routing_allocation', () => { .map((str) => JSON5.parse(str)) as LogRecord[]; expect( - records.find((rec) => rec.message.includes('MARK_VERSION_INDEX_READY -> DONE')) + records.find((rec) => rec.message.includes('MARK_VERSION_INDEX_READY_SYNC -> DONE')) ).toBeDefined(); }, { retryAttempts: 100, retryDelayMs: 500 }