Skip to content

Commit

Permalink
[Migrations] Update all aliases with a single updateAliases() when re…
Browse files Browse the repository at this point in the history
…locating SO documents (#158940)

Fixes #158733

The goal of this modification is to enforce migrators of all indices
involved in a relocation (e.g. as part of the [dot kibana
split](#104081)) to create the
index aliases in the same `updateAliases()` call.

This way, either:
* all the indices involved in the [dot kibana
split](#104081) relocation will
be completely upgraded (with the appropriate aliases).
* or none of them will.
  • Loading branch information
gsoldevila committed Jun 5, 2023
1 parent 9bf4ad7 commit 94fb44a
Show file tree
Hide file tree
Showing 19 changed files with 333 additions and 193 deletions.
Expand Up @@ -12,7 +12,6 @@ import { pipe } from 'fp-ts/lib/pipeable';
import * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import type { IndexMapping } from '@kbn/core-saved-objects-base-server-internal';
import type { AcknowledgeResponse } from '.';
import {
catchRetryableEsClientErrors,
type RetryableEsClientError,
Expand Down Expand Up @@ -46,6 +45,9 @@ export interface CreateIndexParams {
aliases?: string[];
timeout?: string;
}

export type CreateIndexSuccessResponse = 'create_index_succeeded' | 'index_already_exists';

/**
* Creates an index with the given mappings
*
Expand All @@ -64,11 +66,11 @@ export const createIndex = ({
timeout = DEFAULT_TIMEOUT,
}: CreateIndexParams): TaskEither.TaskEither<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
'create_index_succeeded'
CreateIndexSuccessResponse
> => {
const createIndexTask: TaskEither.TaskEither<
RetryableEsClientError | ClusterShardLimitExceeded,
AcknowledgeResponse
CreateIndexSuccessResponse
> = () => {
const aliasesObject = aliasArrayToRecord(aliases);

Expand Down Expand Up @@ -103,31 +105,12 @@ export const createIndex = ({
},
},
})
.then((res) => {
/**
* - acknowledged=false, we timed out before the cluster state was
* updated on all nodes with the newly created index, but it
* probably will be created sometime soon.
* - shards_acknowledged=false, we timed out before all shards were
* started
* - acknowledged=true, shards_acknowledged=true, index creation complete
*/
return Either.right({
acknowledged: Boolean(res.acknowledged),
shardsAcknowledged: res.shards_acknowledged,
});
.then(() => {
return Either.right('create_index_succeeded' as const);
})
.catch((error) => {
if (error?.body?.error?.type === 'resource_already_exists_exception') {
/**
* If the target index already exists it means a previous create
* operation had already been started. However, we can't be sure
* that all shards were started so return shardsAcknowledged: false
*/
return Either.right({
acknowledged: true,
shardsAcknowledged: false,
});
return Either.right('index_already_exists' as const);
} else if (isClusterShardLimitExceeded(error?.body?.error)) {
return Either.left({
type: 'cluster_shard_limit_exceeded' as const,
Expand All @@ -143,11 +126,12 @@ export const createIndex = ({
createIndexTask,
TaskEither.chain<
RetryableEsClientError | IndexNotGreenTimeout | ClusterShardLimitExceeded,
AcknowledgeResponse,
'create_index_succeeded'
CreateIndexSuccessResponse,
CreateIndexSuccessResponse
>((res) => {
// Systematicaly wait until the target index has a 'green' status meaning
// the primary (and on multi node clusters) the replica has been started
// When the index status is 'green' we know that all shards were started
// see https://github.com/elastic/kibana/issues/157968
return pipe(
waitForIndexStatus({
Expand All @@ -156,10 +140,7 @@ export const createIndex = ({
timeout: DEFAULT_TIMEOUT,
status: 'green',
}),
TaskEither.map(() => {
/** When the index status is 'green' we know that all shards were started */
return 'create_index_succeeded';
})
TaskEither.map(() => res)
);
})
);
Expand Down
Expand Up @@ -106,7 +106,8 @@ export {

import type { UnknownDocsFound } from './check_for_unknown_docs';
import type { IncompatibleClusterRoutingAllocation } from './initialize_action';
import { ClusterShardLimitExceeded } from './create_index';
import type { ClusterShardLimitExceeded } from './create_index';
import type { SynchronizationFailed } from './synchronize_migrators';

export type {
CheckForUnknownDocsParams,
Expand Down Expand Up @@ -174,6 +175,7 @@ export interface ActionErrorTypeMap {
index_not_yellow_timeout: IndexNotYellowTimeout;
cluster_shard_limit_exceeded: ClusterShardLimitExceeded;
es_response_too_large: EsResponseTooLargeError;
synchronization_failed: SynchronizationFailed;
}

/**
Expand Down
Expand Up @@ -6,38 +6,36 @@
* Side Public License, v 1.
*/
import { synchronizeMigrators } from './synchronize_migrators';
import { type Defer, defer } from '../kibana_migrator_utils';
import { type WaitGroup, waitGroup as createWaitGroup } from '../kibana_migrator_utils';

describe('synchronizeMigrators', () => {
let defers: Array<Defer<void>>;
let allDefersPromise: Promise<any>;
let migratorsDefers: Array<Defer<void>>;
let waitGroups: Array<WaitGroup<void>>;
let allWaitGroupsPromise: Promise<any>;
let migratorsWaitGroups: Array<WaitGroup<void>>;

beforeEach(() => {
jest.clearAllMocks();

defers = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(defer);
allDefersPromise = Promise.all(defers.map(({ promise }) => promise));
waitGroups = ['.kibana_cases', '.kibana_task_manager', '.kibana'].map(createWaitGroup);
allWaitGroupsPromise = Promise.all(waitGroups.map(({ promise }) => promise));

migratorsDefers = defers.map(({ resolve, reject }) => ({
migratorsWaitGroups = waitGroups.map(({ resolve, reject }) => ({
resolve: jest.fn(resolve),
reject: jest.fn(reject),
promise: allDefersPromise,
promise: allWaitGroupsPromise,
}));
});

describe('when all migrators reach the synchronization point with a correct state', () => {
it('unblocks all migrators and resolves Right', async () => {
const tasks = migratorsDefers.map((migratorDefer) => synchronizeMigrators(migratorDefer));
const tasks = migratorsWaitGroups.map((waitGroup) => synchronizeMigrators({ waitGroup }));

const res = await Promise.all(tasks.map((task) => task()));

migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.resolve).toHaveBeenCalledTimes(1)
);
migratorsDefers.forEach((migratorDefer) =>
expect(migratorDefer.reject).not.toHaveBeenCalled()
migratorsWaitGroups.forEach((waitGroup) =>
expect(waitGroup.resolve).toHaveBeenCalledTimes(1)
);
migratorsWaitGroups.forEach((waitGroup) => expect(waitGroup.reject).not.toHaveBeenCalled());

expect(res).toEqual([
{ _tag: 'Right', right: 'synchronized_successfully' },
Expand All @@ -48,13 +46,11 @@ describe('synchronizeMigrators', () => {

it('migrators are not unblocked until the last one reaches the synchronization point', async () => {
let resolved: number = 0;
migratorsDefers.forEach((migratorDefer) => migratorDefer.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
migratorsWaitGroups.forEach((waitGroup) => waitGroup.promise.then(() => ++resolved));
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we simulate that only kibana_task_manager and kibana migrators get to the sync point
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));
// we don't await for them, or we would be locked forever
Promise.all(tasks.map((task) => task()));

Expand All @@ -65,7 +61,7 @@ describe('synchronizeMigrators', () => {
expect(resolved).toEqual(0);

// finally, the last migrator gets to the synchronization point
await synchronizeMigrators(casesDefer)();
await synchronizeMigrators({ waitGroup: casesDefer })();
expect(resolved).toEqual(3);
});
});
Expand All @@ -75,31 +71,29 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// we first make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');

// the other migrators then try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

expect(Promise.all(tasks.map((task) => task()))).resolves.toEqual([
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
Expand All @@ -116,15 +110,13 @@ describe('synchronizeMigrators', () => {
it('synchronizedMigrators resolves Left for the rest of migrators', async () => {
let resolved: number = 0;
let errors: number = 0;
migratorsDefers.forEach((migratorDefer) =>
migratorDefer.promise.then(() => ++resolved).catch(() => ++errors)
migratorsWaitGroups.forEach((waitGroup) =>
waitGroup.promise.then(() => ++resolved).catch(() => ++errors)
);
const [casesDefer, ...otherMigratorsDefers] = migratorsDefers;
const [casesDefer, ...otherMigratorsDefers] = migratorsWaitGroups;

// some migrators try to synchronize
const tasks = otherMigratorsDefers.map((migratorDefer) =>
synchronizeMigrators(migratorDefer)
);
const tasks = otherMigratorsDefers.map((waitGroup) => synchronizeMigrators({ waitGroup }));

// we then make one random migrator fail and not reach the sync point
casesDefer.reject('Oops. The cases migrator failed unexpectedly.');
Expand All @@ -133,14 +125,14 @@ describe('synchronizeMigrators', () => {
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
{
_tag: 'Left',
left: {
type: 'sync_failed',
type: 'synchronization_failed',
error: 'Oops. The cases migrator failed unexpectedly.',
},
},
Expand Down
Expand Up @@ -8,20 +8,33 @@

import * as Either from 'fp-ts/lib/Either';
import * as TaskEither from 'fp-ts/lib/TaskEither';
import type { Defer } from '../kibana_migrator_utils';
import type { WaitGroup } from '../kibana_migrator_utils';

export interface SyncFailed {
type: 'sync_failed';
/** @internal */
export interface SynchronizationFailed {
type: 'synchronization_failed';
error: Error;
}

export function synchronizeMigrators(
defer: Defer<void>
): TaskEither.TaskEither<SyncFailed, 'synchronized_successfully'> {
/** @internal */
export interface SynchronizeMigratorsParams<T, U> {
waitGroup: WaitGroup<T>;
thenHook?: (res: any) => Either.Right<U>;
payload?: T;
}

export function synchronizeMigrators<T, U>({
waitGroup,
payload,
thenHook = () =>
Either.right(
'synchronized_successfully' as const
) as Either.Right<'synchronized_successfully'> as unknown as Either.Right<U>,
}: SynchronizeMigratorsParams<T, U>): TaskEither.TaskEither<SynchronizationFailed, U> {
return () => {
defer.resolve();
return defer.promise
.then(() => Either.right('synchronized_successfully' as const))
.catch((error) => Either.left({ type: 'sync_failed' as const, error }));
waitGroup.resolve(payload);
return waitGroup.promise
.then((res) => (thenHook ? thenHook(res) : res))
.catch((error) => Either.left({ type: 'synchronization_failed' as const, error }));
};
}
Expand Up @@ -17,24 +17,24 @@ import { MAIN_SAVED_OBJECT_INDEX } from '@kbn/core-saved-objects-server';
import { loggerMock } from '@kbn/logging-mocks';
import {
calculateTypeStatuses,
createMultiPromiseDefer,
createWaitGroupMap,
getCurrentIndexTypesMap,
getIndicesInvolvedInRelocation,
indexMapToIndexTypesMap,
} from './kibana_migrator_utils';
import { INDEX_MAP_BEFORE_SPLIT } from './kibana_migrator_utils.fixtures';

describe('createMultiPromiseDefer', () => {
describe('createWaitGroupMap', () => {
it('creates defer objects with the same Promise', () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
expect(Object.keys(defers)).toHaveLength(2);
expect(defers['.kibana'].promise).toEqual(defers['.kibana_cases'].promise);
expect(defers['.kibana'].resolve).not.toEqual(defers['.kibana_cases'].resolve);
expect(defers['.kibana'].reject).not.toEqual(defers['.kibana_cases'].reject);
});

it('the common Promise resolves when all defers resolve', async () => {
const defers = createMultiPromiseDefer(['.kibana', '.kibana_cases']);
const defers = createWaitGroupMap(['.kibana', '.kibana_cases']);
let resolved = 0;
Object.values(defers).forEach((defer) => defer.promise.then(() => ++resolved));
defers['.kibana'].resolve();
Expand Down

0 comments on commit 94fb44a

Please sign in to comment.