Skip to content

Commit

Permalink
[Ingest Manager] Optimize installation of integration (#67708) (#67869)
Browse files Browse the repository at this point in the history
* call getArchiveInfo once first, pass paths to template

* pass paths to installPreBuiltTemplates

* pass paths to installILMPolicy

* pass paths to ingest pipeline creation

* use correct package key for cache

* pass paths to kibana assets

* cache other installed packages

* create function for ensuring packages are cached

* remove unused imports

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
neptunian and elasticmachine committed Jun 1, 2020
1 parent e236701 commit 68e103e
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,8 @@
import { CallESAsCurrentUser, ElasticsearchAssetType } from '../../../../types';
import * as Registry from '../../registry';

export async function installILMPolicy(
pkgName: string,
pkgVersion: string,
callCluster: CallESAsCurrentUser
) {
const ilmPaths = await Registry.getArchiveInfo(
pkgName,
pkgVersion,
(entry: Registry.ArchiveEntry) => isILMPolicy(entry)
);
export async function installILMPolicy(paths: string[], callCluster: CallESAsCurrentUser) {
const ilmPaths = paths.filter((path) => isILMPolicy(path));
if (!ilmPaths.length) return;
await Promise.all(
ilmPaths.map(async (path) => {
Expand All @@ -36,7 +28,7 @@ export async function installILMPolicy(
})
);
}
const isILMPolicy = ({ path }: Registry.ArchiveEntry) => {
const isILMPolicy = (path: string) => {
const pathParts = Registry.pathParts(path);
return pathParts.type === ElasticsearchAssetType.ilmPolicy;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ interface RewriteSubstitution {

export const installPipelines = async (
registryPackage: RegistryPackage,
paths: string[],
callCluster: CallESAsCurrentUser
) => {
const datasets = registryPackage.datasets;
const pipelinePaths = paths.filter((path) => isPipeline(path));
if (datasets) {
const pipelines = datasets.reduce<Array<Promise<AssetReference[]>>>((acc, dataset) => {
if (dataset.ingest_pipeline) {
acc.push(
installPipelinesForDataset({
dataset,
callCluster,
pkgName: registryPackage.name,
paths: pipelinePaths,
pkgVersion: registryPackage.version,
})
);
Expand Down Expand Up @@ -67,20 +69,16 @@ export function rewriteIngestPipeline(

export async function installPipelinesForDataset({
callCluster,
pkgName,
pkgVersion,
paths,
dataset,
}: {
callCluster: CallESAsCurrentUser;
pkgName: string;
pkgVersion: string;
paths: string[];
dataset: Dataset;
}): Promise<AssetReference[]> {
const pipelinePaths = await Registry.getArchiveInfo(
pkgName,
pkgVersion,
(entry: Registry.ArchiveEntry) => isDatasetPipeline(entry, dataset.path)
);
const pipelinePaths = paths.filter((path) => isDatasetPipeline(path, dataset.path));
let pipelines: any[] = [];
const substitutions: RewriteSubstitution[] = [];

Expand Down Expand Up @@ -152,8 +150,8 @@ async function installPipeline({
}

const isDirectory = ({ path }: Registry.ArchiveEntry) => path.endsWith('/');
const isDatasetPipeline = ({ path }: Registry.ArchiveEntry, datasetName: string) => {
// TODO: better way to get particular assets

const isDatasetPipeline = (path: string, datasetName: string) => {
const pathParts = Registry.pathParts(path);
return (
!isDirectory({ path }) &&
Expand All @@ -162,6 +160,10 @@ const isDatasetPipeline = ({ path }: Registry.ArchiveEntry, datasetName: string)
datasetName === pathParts.dataset
);
};
const isPipeline = (path: string) => {
const pathParts = Registry.pathParts(path);
return pathParts.type === ElasticsearchAssetType.ingestPipeline;
};

// XXX: assumes path/to/file.ext -- 0..n '/' and exactly one '.'
const getNameAndExtension = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ export const installTemplates = async (
registryPackage: RegistryPackage,
callCluster: CallESAsCurrentUser,
pkgName: string,
pkgVersion: string
pkgVersion: string,
paths: string[]
): Promise<TemplateRef[]> => {
// install any pre-built index template assets,
// atm, this is only the base package's global index templates
// Install component templates first, as they are used by the index templates
await installPreBuiltComponentTemplates(pkgName, pkgVersion, callCluster);
await installPreBuiltTemplates(pkgName, pkgVersion, callCluster);
await installPreBuiltComponentTemplates(paths, callCluster);
await installPreBuiltTemplates(paths, callCluster);

// build templates per dataset from yml files
const datasets = registryPackage.datasets;
Expand All @@ -44,16 +45,8 @@ export const installTemplates = async (
return [];
};

const installPreBuiltTemplates = async (
pkgName: string,
pkgVersion: string,
callCluster: CallESAsCurrentUser
) => {
const templatePaths = await Registry.getArchiveInfo(
pkgName,
pkgVersion,
(entry: Registry.ArchiveEntry) => isTemplate(entry)
);
const installPreBuiltTemplates = async (paths: string[], callCluster: CallESAsCurrentUser) => {
const templatePaths = paths.filter((path) => isTemplate(path));
const templateInstallPromises = templatePaths.map(async (path) => {
const { file } = Registry.pathParts(path);
const templateName = file.substr(0, file.lastIndexOf('.'));
Expand Down Expand Up @@ -95,15 +88,10 @@ const installPreBuiltTemplates = async (
};

const installPreBuiltComponentTemplates = async (
pkgName: string,
pkgVersion: string,
paths: string[],
callCluster: CallESAsCurrentUser
) => {
const templatePaths = await Registry.getArchiveInfo(
pkgName,
pkgVersion,
(entry: Registry.ArchiveEntry) => isComponentTemplate(entry)
);
const templatePaths = paths.filter((path) => isComponentTemplate(path));
const templateInstallPromises = templatePaths.map(async (path) => {
const { file } = Registry.pathParts(path);
const templateName = file.substr(0, file.lastIndexOf('.'));
Expand Down Expand Up @@ -134,12 +122,12 @@ const installPreBuiltComponentTemplates = async (
}
};

const isTemplate = ({ path }: Registry.ArchiveEntry) => {
const isTemplate = (path: string) => {
const pathParts = Registry.pathParts(path);
return pathParts.type === ElasticsearchAssetType.indexTemplate;
};

const isComponentTemplate = ({ path }: Registry.ArchiveEntry) => {
const isComponentTemplate = (path: string) => {
const pathParts = Registry.pathParts(path);
return pathParts.type === ElasticsearchAssetType.componentTemplate;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ export async function installIndexPatterns(
savedObjectsClient,
InstallationStatus.installed
);

// TODO: move to install package
// cache all installed packages if they don't exist
const packagePromises = installedPackages.map((pkg) =>
Registry.ensureCachedArchiveInfo(pkg.pkgName, pkg.pkgVersion)
);
await Promise.all(packagePromises);

if (pkgName && pkgVersion) {
// add this package to the array if it doesn't already exist
const foundPkg = installedPackages.find((pkg) => pkg.pkgName === pkgName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import { RegistryPackage } from '../../../types';
import * as Registry from '../registry';
import { cacheHas } from '../registry/cache';
import { ensureCachedArchiveInfo } from '../registry';

// paths from RegistryPackage are routes to the assets on EPR
// e.g. `/package/nginx/1.2.0/dataset/access/fields/fields.yml`
Expand Down Expand Up @@ -57,8 +57,8 @@ export async function getAssetsData(
datasetName?: string
): Promise<Registry.ArchiveEntry[]> {
// TODO: Needs to be called to fill the cache but should not be required
const pkgkey = packageInfo.name + '-' + packageInfo.version;
if (!cacheHas(pkgkey)) await Registry.getArchiveInfo(packageInfo.name, packageInfo.version);

await ensureCachedArchiveInfo(packageInfo.name, packageInfo.version);

// Gather all asset data
const assets = getAssets(packageInfo, filter, datasetName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export async function installPackage(options: {
const { savedObjectsClient, pkgkey, callCluster } = options;
// TODO: change epm API to /packageName/version so we don't need to do this
const [pkgName, pkgVersion] = pkgkey.split('-');

const paths = await Registry.getArchiveInfo(pkgName, pkgVersion);
// see if some version of this package is already installed
// TODO: calls to getInstallationObject, Registry.fetchInfo, and Registry.fetchFindLatestPackge
// and be replaced by getPackageInfo after adjusting for it to not group/use archive assets
Expand Down Expand Up @@ -119,23 +119,25 @@ export async function installPackage(options: {
savedObjectsClient,
pkgName,
pkgVersion,
paths,
}),
installPipelines(registryPackageInfo, callCluster),
installPipelines(registryPackageInfo, paths, callCluster),
// index patterns and ilm policies are not currently associated with a particular package
// so we do not save them in the package saved object state.
installIndexPatterns(savedObjectsClient, pkgName, pkgVersion),
// currenly only the base package has an ILM policy
// at some point ILM policies can be installed/modified
// per dataset and we should then save them
installILMPolicy(pkgName, pkgVersion, callCluster),
installILMPolicy(paths, callCluster),
]);

// install or update the templates
const installedTemplates = await installTemplates(
registryPackageInfo,
callCluster,
pkgName,
pkgVersion
pkgVersion,
paths
);
const toSaveESIndexPatterns = generateESIndexPatterns(registryPackageInfo.datasets);

Expand Down Expand Up @@ -186,13 +188,14 @@ export async function installKibanaAssets(options: {
savedObjectsClient: SavedObjectsClientContract;
pkgName: string;
pkgVersion: string;
paths: string[];
}) {
const { savedObjectsClient, pkgName, pkgVersion } = options;
const { savedObjectsClient, paths } = options;

// Only install Kibana assets during package installation.
const kibanaAssetTypes = Object.values(KibanaAssetType);
const installationPromises = kibanaAssetTypes.map(async (assetType) =>
installKibanaSavedObjects({ savedObjectsClient, pkgName, pkgVersion, assetType })
installKibanaSavedObjects({ savedObjectsClient, assetType, paths })
);

// installKibanaSavedObjects returns AssetReference[], so .map creates AssetReference[][]
Expand Down Expand Up @@ -237,19 +240,16 @@ export async function saveInstallationReferences(options: {

async function installKibanaSavedObjects({
savedObjectsClient,
pkgName,
pkgVersion,
assetType,
paths,
}: {
savedObjectsClient: SavedObjectsClientContract;
pkgName: string;
pkgVersion: string;
assetType: KibanaAssetType;
paths: string[];
}) {
const isSameType = ({ path }: Registry.ArchiveEntry) =>
assetType === Registry.pathParts(path).type;
const paths = await Registry.getArchiveInfo(pkgName, pkgVersion, isSameType);
const toBeSavedObjects = await Promise.all(paths.map(getObject));
const isSameType = (path: string) => assetType === Registry.pathParts(path).type;
const pathsOfType = paths.filter((path) => isSameType(path));
const toBeSavedObjects = await Promise.all(pathsOfType.map(getObject));

if (toBeSavedObjects.length === 0) {
return [];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ const cache: Map<string, Buffer> = new Map();
export const cacheGet = (key: string) => cache.get(key);
export const cacheSet = (key: string, value: Buffer) => cache.set(key, value);
export const cacheHas = (key: string) => cache.has(key);
export const getCacheKey = (key: string) => key + '.tar.gz';
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
RegistrySearchResults,
RegistrySearchResult,
} from '../../../types';
import { cacheGet, cacheSet } from './cache';
import { cacheGet, cacheSet, getCacheKey, cacheHas } from './cache';
import { ArchiveEntry, untarBuffer } from './extract';
import { fetchUrl, getResponse, getResponseStream } from './requests';
import { streamToBuffer } from './streams';
Expand Down Expand Up @@ -135,7 +135,7 @@ async function extract(

async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
// assume .tar.gz for now. add support for .zip if/when we need it
const key = `${pkgName}-${pkgVersion}.tar.gz`;
const key = getCacheKey(`${pkgName}-${pkgVersion}`);
let buffer = cacheGet(key);
if (!buffer) {
buffer = await fetchArchiveBuffer(pkgName, pkgVersion);
Expand All @@ -149,6 +149,13 @@ async function getOrFetchArchiveBuffer(pkgName: string, pkgVersion: string): Pro
}
}

export async function ensureCachedArchiveInfo(name: string, version: string) {
const pkgkey = getCacheKey(`${name}-${version}`);
if (!cacheHas(pkgkey)) {
await getArchiveInfo(name, version);
}
}

async function fetchArchiveBuffer(pkgName: string, pkgVersion: string): Promise<Buffer> {
const { download: archivePath } = await fetchInfo(pkgName, pkgVersion);
const registryUrl = getRegistryUrl();
Expand Down

0 comments on commit 68e103e

Please sign in to comment.