Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] Increase package install max timeout + add concurrency control to rollovers #166775

Merged
merged 5 commits into from Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/common/constants/epm.ts
Expand Up @@ -9,7 +9,7 @@ import { ElasticsearchAssetType, KibanaAssetType } from '../types/models';

export const PACKAGES_SAVED_OBJECT_TYPE = 'epm-packages';
export const ASSETS_SAVED_OBJECT_TYPE = 'epm-packages-assets';
export const MAX_TIME_COMPLETE_INSTALL = 60000;
export const MAX_TIME_COMPLETE_INSTALL = 30 * 60 * 1000; // 30 minutes

export const FLEET_SYSTEM_PACKAGE = 'system';
export const FLEET_ELASTIC_AGENT_PACKAGE = 'elastic_agent';
Expand Down
Expand Up @@ -11,6 +11,8 @@ import type {
MappingTypeMapping,
} from '@elastic/elasticsearch/lib/api/typesWithBodyKey';

import pMap from 'p-map';

import type { Field, Fields } from '../../fields/field';
import type {
RegistryDataStream,
Expand Down Expand Up @@ -729,15 +731,22 @@ const updateAllDataStreams = async (
esClient: ElasticsearchClient,
logger: Logger
): Promise<void> => {
const updatedataStreamPromises = indexNameWithTemplates.map((templateEntry) => {
return updateExistingDataStream({
esClient,
logger,
dataStreamName: templateEntry.dataStreamName,
});
});
await Promise.all(updatedataStreamPromises);
await pMap(
indexNameWithTemplates,
(templateEntry) => {
return updateExistingDataStream({
esClient,
logger,
dataStreamName: templateEntry.dataStreamName,
});
},
{
// Limit concurrent putMapping/rollover requests to avoid overhwhelming ES cluster
concurrency: 20,
}
);
};

const updateExistingDataStream = async ({
dataStreamName,
esClient,
Expand Down
Expand Up @@ -5,11 +5,21 @@
* 2.0.
*/

import type { SavedObjectsClientContract, ElasticsearchClient } from '@kbn/core/server';
import type {
SavedObjectsClientContract,
ElasticsearchClient,
SavedObject,
} from '@kbn/core/server';
import { savedObjectsClientMock, elasticsearchServiceMock } from '@kbn/core/server/mocks';
import { loggerMock } from '@kbn/logging-mocks';
import { DEFAULT_SPACE_ID } from '@kbn/spaces-plugin/common/constants';

import { ConcurrentInstallOperationError } from '../../../errors';

import type { Installation } from '../../../../common';

import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../../common';

import { appContextService } from '../../app_context';
import { createAppContextStartContractMock } from '../../../mocks';
import { saveArchiveEntries } from '../archive/storage';
Expand All @@ -29,7 +39,9 @@ jest.mock('../elasticsearch/datastream_ilm/install');
import { updateCurrentWriteIndices } from '../elasticsearch/template/template';
import { installKibanaAssetsAndReferences } from '../kibana/assets/install';

import { installIndexTemplatesAndPipelines } from './install';
import { MAX_TIME_COMPLETE_INSTALL } from '../../../../common/constants';

import { installIndexTemplatesAndPipelines, restartInstallation } from './install';

import { _installPackage } from './_install_package';

Expand Down Expand Up @@ -69,9 +81,7 @@ describe('_installPackage', () => {
jest.mocked(saveArchiveEntries).mockResolvedValue({
saved_objects: [],
});
});
afterEach(async () => {
appContextService.stop();
jest.mocked(restartInstallation).mockReset();
});
it('handles errors from installKibanaAssets', async () => {
// force errors from this function
Expand Down Expand Up @@ -226,4 +236,128 @@ describe('_installPackage', () => {
expect(installILMPolicy).toBeCalled();
expect(installIlmForDataStream).toBeCalled();
});

describe('when package is stuck in `installing`', () => {
afterEach(() => {});
const mockInstalledPackageSo: SavedObject<Installation> = {
id: 'mocked-package',
attributes: {
name: 'test-package',
version: '1.0.0',
install_status: 'installing',
install_version: '1.0.0',
install_started_at: new Date().toISOString(),
install_source: 'registry',
verification_status: 'verified',
installed_kibana: [] as any,
installed_es: [] as any,
es_index_patterns: {},
},
type: PACKAGES_SAVED_OBJECT_TYPE,
references: [],
};

beforeEach(() => {
appContextService.start(
createAppContextStartContractMock({
internal: {
disableILMPolicies: true,
disableProxies: false,
fleetServerStandalone: false,
onlyAllowAgentUpgradeToKnownVersions: false,
registry: {
kibanaVersionCheckEnabled: true,
capabilities: [],
},
},
})
);
});

describe('timeout reached', () => {
it('restarts installation', async () => {
await _installPackage({
savedObjectsClient: soClient,
// @ts-ignore
savedObjectsImporter: jest.fn(),
esClient,
logger: loggerMock.create(),
paths: [],
packageInfo: {
name: mockInstalledPackageSo.attributes.name,
version: mockInstalledPackageSo.attributes.version,
title: mockInstalledPackageSo.attributes.name,
} as any,
installedPkg: {
...mockInstalledPackageSo,
attributes: {
...mockInstalledPackageSo.attributes,
install_started_at: new Date(
Date.now() - MAX_TIME_COMPLETE_INSTALL * 2
).toISOString(),
},
},
});

expect(restartInstallation).toBeCalled();
});
});

describe('timeout not reached', () => {
describe('force flag not provided', () => {
it('throws concurrent installation error if force flag is not provided', async () => {
expect(
_installPackage({
savedObjectsClient: soClient,
// @ts-ignore
savedObjectsImporter: jest.fn(),
esClient,
logger: loggerMock.create(),
paths: [],
packageInfo: {
name: mockInstalledPackageSo.attributes.name,
version: mockInstalledPackageSo.attributes.version,
title: mockInstalledPackageSo.attributes.name,
} as any,
installedPkg: {
...mockInstalledPackageSo,
attributes: {
...mockInstalledPackageSo.attributes,
install_started_at: new Date(Date.now() - 1000).toISOString(),
},
},
})
).rejects.toThrowError(ConcurrentInstallOperationError);
});
});

describe('force flag provided', () => {
it('restarts installation', async () => {
await _installPackage({
savedObjectsClient: soClient,
// @ts-ignore
savedObjectsImporter: jest.fn(),
esClient,
logger: loggerMock.create(),
paths: [],
packageInfo: {
name: mockInstalledPackageSo.attributes.name,
version: mockInstalledPackageSo.attributes.version,
title: mockInstalledPackageSo.attributes.name,
} as any,
installedPkg: {
...mockInstalledPackageSo,
attributes: {
...mockInstalledPackageSo.attributes,
install_started_at: new Date(Date.now() - 1000).toISOString(),
},
},
force: true,
});

expect(restartInstallation).toBeCalled();
});
});
});
});
});
Expand Up @@ -99,18 +99,30 @@ export async function _installPackage({
try {
// if some installation already exists
if (installedPkg) {
const isStatusInstalling = installedPkg.attributes.install_status === 'installing';
const hasExceededTimeout =
Date.now() - Date.parse(installedPkg.attributes.install_started_at) <
MAX_TIME_COMPLETE_INSTALL;

// if the installation is currently running, don't try to install
// instead, only return already installed assets
if (
installedPkg.attributes.install_status === 'installing' &&
Date.now() - Date.parse(installedPkg.attributes.install_started_at) <
MAX_TIME_COMPLETE_INSTALL
) {
throw new ConcurrentInstallOperationError(
`Concurrent installation or upgrade of ${pkgName || 'unknown'}-${
pkgVersion || 'unknown'
} detected, aborting.`
);
if (isStatusInstalling && hasExceededTimeout) {
// If this is a forced installation, ignore the timeout and restart the installation anyway
if (force) {
await restartInstallation({
savedObjectsClient,
pkgName,
pkgVersion,
installSource,
verificationResult,
});
} else {
throw new ConcurrentInstallOperationError(
`Concurrent installation or upgrade of ${pkgName || 'unknown'}-${
pkgVersion || 'unknown'
} detected, aborting.`
);
}
Comment on lines +109 to +125
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the edge case mentioned in #166761 (comment), I'm allowing forced installation to immediately trigger a restart of a stuck installation without honoring the timeout.

} else {
// if no installation is running, or the installation has been running longer than MAX_TIME_COMPLETE_INSTALL
// (it might be stuck) update the saved object and proceed
Expand Down