Skip to content

Commit

Permalink
[EDR Workflows][Osquery] Use newly added action responses data stream (
Browse files Browse the repository at this point in the history
…#183892)

**Prerequisite**: elastic/elasticsearch#108849

**Follow-up**: elastic/integrations#9661

This PR introduces a new index
`logs-osquery_manager.action.responses-default` for action responses.
This index will be added in Osquery Manager integration version `1.12`
and will replace the existing
`.logs-osquery_manager.action.responses-default`, which is currently
populated by a transform from `.fleet-actions`.

Since most users will still be using the old integration package, we
ensured that the implementation checks the old index first and returns
the response from there unless the new index is available. If the new
index is available, the response will come from it. This change ensures
compatibility with all user scenarios.
  • Loading branch information
szwarckonrad committed May 24, 2024
1 parent 09a165f commit 9bafd06
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 22 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/osquery/common/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const RESULTS_INDEX = `${OSQUERY_LOGS_BASE}.results`;
export const OSQUERY_ACTIONS_INDEX = `${ACTIONS_INDEX}-*`;

export const ACTION_RESPONSES_INDEX = `.logs-${OSQUERY_INTEGRATION_NAME}.action.responses`;
export const ACTION_RESPONSES_DATA_STREAM_INDEX = `logs-${OSQUERY_INTEGRATION_NAME}.action.responses`;

export const DEFAULT_PLATFORM = 'linux,windows,darwin';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ export interface ActionResultsStrategyResponse
export interface ActionResultsRequestOptions extends RequestOptionsPaginated {
actionId: string;
startDate?: string;
useNewDataStream?: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import type { ISearchRequestParams } from '@kbn/search-types';
import { AGENT_ACTIONS_RESULTS_INDEX } from '@kbn/fleet-plugin/common';
import { isEmpty } from 'lodash';
import moment from 'moment';
import { ACTION_RESPONSES_INDEX } from '../../../../../../common/constants';
import {
ACTION_RESPONSES_DATA_STREAM_INDEX,
ACTION_RESPONSES_INDEX,
} from '../../../../../../common/constants';
import type { ActionResultsRequestOptions } from '../../../../../../common/search_strategy';
import { getQueryFilter } from '../../../../../utils/build_query';

Expand All @@ -19,6 +22,7 @@ export const buildActionResultsQuery = ({
startDate,
sort,
componentTemplateExists,
useNewDataStream,
}: ActionResultsRequestOptions): ISearchRequestParams => {
let filter = `action_id: ${actionId}`;
if (!isEmpty(kuery)) {
Expand All @@ -41,11 +45,18 @@ export const buildActionResultsQuery = ({

const filterQuery = [...timeRangeFilter, getQueryFilter({ filter })];

let index: string;
if (useNewDataStream) {
index = ACTION_RESPONSES_DATA_STREAM_INDEX;
} else if (componentTemplateExists) {
index = ACTION_RESPONSES_INDEX;
} else {
index = AGENT_ACTIONS_RESULTS_INDEX;
}

return {
allow_no_indices: true,
index: componentTemplateExists
? `${ACTION_RESPONSES_INDEX}-default*`
: `${AGENT_ACTIONS_RESULTS_INDEX}*`,
index,
ignore_unavailable: true,
body: {
aggs: {
Expand Down
81 changes: 63 additions & 18 deletions x-pack/plugins/osquery/server/search_strategy/osquery/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
* 2.0.
*/

import { from, map, mergeMap } from 'rxjs';
import { map, mergeMap, forkJoin, from, of } from 'rxjs';
import type { ISearchStrategy, PluginStart } from '@kbn/data-plugin/server';
import { shimHitsTotal } from '@kbn/data-plugin/server';
import { ENHANCED_ES_SEARCH_STRATEGY } from '@kbn/data-plugin/common';
import type { CoreStart } from '@kbn/core/server';
import { ACTIONS_INDEX } from '../../../common/constants';
import { ACTION_RESPONSES_DATA_STREAM_INDEX, ACTIONS_INDEX } from '../../../common/constants';
import type {
FactoryQueryTypes,
StrategyResponseType,
StrategyRequestType,
} from '../../../common/search_strategy/osquery';
import { OsqueryQueries } from '../../../common/search_strategy/osquery';
import { osqueryFactory } from './factory';
import type { OsqueryFactory } from './factory/types';

Expand All @@ -33,12 +34,15 @@ export const osquerySearchStrategyProvider = <T extends FactoryQueryTypes>(

const queryFactory: OsqueryFactory<T> = osqueryFactory[request.factoryQueryType];

return from(
esClient.asInternalUser.indices.exists({
return forkJoin({
actionsIndexExists: esClient.asInternalUser.indices.exists({
index: `${ACTIONS_INDEX}*`,
})
).pipe(
mergeMap((exists) => {
}),
newDataStreamIndexExists: esClient.asInternalUser.indices.exists({
index: `${ACTION_RESPONSES_DATA_STREAM_INDEX}*`,
}),
}).pipe(
mergeMap(({ actionsIndexExists, newDataStreamIndexExists }) => {
const strictRequest = {
factoryQueryType: request.factoryQueryType,
kuery: request.kuery,
Expand All @@ -51,31 +55,72 @@ export const osquerySearchStrategyProvider = <T extends FactoryQueryTypes>(

const dsl = queryFactory.buildDsl({
...strictRequest,
componentTemplateExists: exists,
componentTemplateExists: actionsIndexExists,
} as StrategyRequestType<T>);
// use internal user for searching .fleet* indices
es =
dsl.index?.includes('fleet') || dsl.index?.includes('logs-osquery_manager.action')
? data.search.searchAsInternalUser
: data.search.getSearchStrategy(ENHANCED_ES_SEARCH_STRATEGY);

return es.search(
const searchLegacyIndex$ = es.search(
{
...strictRequest,
params: dsl,
},
options,
deps
);
}),
map((response) => ({
...response,
...{
rawResponse: shimHitsTotal(response.rawResponse, options),
},
total: response.rawResponse.hits.total as number,
})),
mergeMap((esSearchRes) => queryFactory.parse(request, esSearchRes))

// With the introduction of a new DS that sends data directly from an agent into the new index
// logs-osquery_manager.action.responses-default, instead of the old index .logs-osquery_manager.action.responses-default
// which was populated by a transform, we now need to check both places for results.
// The new index was introduced in integration package 1.12, so users running earlier versions won't have it.

return searchLegacyIndex$.pipe(
mergeMap((legacyIndexResponse) => {
if (
request.factoryQueryType === OsqueryQueries.actionResults &&
newDataStreamIndexExists
) {
const dataStreamDsl = queryFactory.buildDsl({
...strictRequest,
componentTemplateExists: actionsIndexExists,
useNewDataStream: true,
} as StrategyRequestType<T>);

return from(
es.search(
{
...strictRequest,
params: dataStreamDsl,
},
options,
deps
)
).pipe(
map((newDataStreamIndexResponse) => {
if (newDataStreamIndexResponse.rawResponse.hits.total) {
return newDataStreamIndexResponse;
}

return legacyIndexResponse;
})
);
}

return of(legacyIndexResponse);
}),
map((response) => ({
...response,
...{
rawResponse: shimHitsTotal(response.rawResponse, options),
},
total: response.rawResponse.hits.total as number,
})),
mergeMap((esSearchRes) => queryFactory.parse(request, esSearchRes))
);
})
);
},
cancel: async (id, options, deps) => {
Expand Down

0 comments on commit 9bafd06

Please sign in to comment.