diff --git a/x-pack/plugins/ingest_manager/common/types/models/agent.ts b/x-pack/plugins/ingest_manager/common/types/models/agent.ts index 2b8a306577e7d2..a204373fe2e56b 100644 --- a/x-pack/plugins/ingest_manager/common/types/models/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/models/agent.ts @@ -21,7 +21,8 @@ export type AgentStatus = | 'unenrolling' | 'degraded'; -export type AgentActionType = 'CONFIG_CHANGE' | 'DATA_DUMP' | 'RESUME' | 'PAUSE' | 'UNENROLL'; +export type AgentActionType = 'CONFIG_CHANGE' | 'UNENROLL'; + export interface NewAgentAction { type: AgentActionType; data?: any; @@ -29,20 +30,44 @@ export interface NewAgentAction { } export interface AgentAction extends NewAgentAction { + type: AgentActionType; + data?: any; + sent_at?: string; id: string; agent_id: string; created_at: string; + ack_data?: any; +} + +export interface AgentPolicyAction extends NewAgentAction { + id: string; + type: AgentActionType; + data?: any; + policy_id: string; + policy_revision: number; + created_at: string; + ack_data?: any; } -export interface AgentActionSOAttributes { +interface CommonAgentActionSOAttributes { type: AgentActionType; sent_at?: string; timestamp?: string; created_at: string; - agent_id: string; data?: string; + ack_data?: string; } +export type AgentActionSOAttributes = CommonAgentActionSOAttributes & { + agent_id: string; +}; +export type AgentPolicyActionSOAttributes = CommonAgentActionSOAttributes & { + policy_id: string; + policy_revision: number; +}; + +export type BaseAgentActionSOAttributes = AgentActionSOAttributes | AgentPolicyActionSOAttributes; + export interface NewAgentEvent { type: 'STATE' | 'ERROR' | 'ACTION_RESULT' | 'ACTION'; subtype: // State diff --git a/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts b/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts index cf8d3ab1c908ac..54cdeade3764e0 100644 --- a/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts +++ b/x-pack/plugins/ingest_manager/common/types/rest_spec/agent.ts @@ -7,11 +7,11 @@ import { Agent, AgentAction, + NewAgentAction, NewAgentEvent, AgentEvent, AgentStatus, AgentType, - NewAgentAction, } from '../models'; export interface GetAgentsRequest { diff --git a/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts b/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts index b81d44c40f8eb8..12a0956b791550 100644 --- a/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts +++ b/x-pack/plugins/ingest_manager/server/routes/agent/actions_handlers.ts @@ -10,7 +10,6 @@ import { RequestHandler } from 'kibana/server'; import { TypeOf } from '@kbn/config-schema'; import { PostNewAgentActionRequestSchema } from '../../types/rest_spec'; import { ActionsService } from '../../services/agents'; -import { NewAgentAction } from '../../../common/types/models'; import { PostNewAgentActionResponse } from '../../../common/types/rest_spec'; export const postNewAgentActionHandlerBuilder = function ( @@ -26,7 +25,7 @@ export const postNewAgentActionHandlerBuilder = function ( const agent = await actionsService.getAgent(soClient, request.params.agentId); - const newAgentAction = request.body.action as NewAgentAction; + const newAgentAction = request.body.action; const savedAgentAction = await actionsService.createAgentAction(soClient, { created_at: new Date().toISOString(), diff --git a/x-pack/plugins/ingest_manager/server/saved_objects/index.ts b/x-pack/plugins/ingest_manager/server/saved_objects/index.ts index aff8e607622d47..e86f7b24e2c784 100644 --- a/x-pack/plugins/ingest_manager/server/saved_objects/index.ts +++ b/x-pack/plugins/ingest_manager/server/saved_objects/index.ts @@ -98,8 +98,11 @@ const savedObjectTypes: { [key: string]: SavedObjectsType } = { mappings: { properties: { agent_id: { type: 'keyword' }, + policy_id: { type: 'keyword' }, + policy_revision: { type: 'integer' }, type: { type: 'keyword' }, data: { type: 'binary' }, + ack_data: { type: 'text' }, sent_at: { type: 'date' }, created_at: { type: 'date' }, }, diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts index a03a3b7f59fba3..938cfb43516303 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy.ts @@ -21,7 +21,7 @@ import { ListWithKuery, } from '../types'; import { DeleteAgentPolicyResponse, storedPackagePoliciesToAgentInputs } from '../../common'; -import { listAgents } from './agents'; +import { createAgentPolicyAction, listAgents } from './agents'; import { packagePolicyService } from './package_policy'; import { outputService } from './output'; import { agentPolicyUpdateEventHandler } from './agent_policy_update'; @@ -67,6 +67,10 @@ class AgentPolicyService { updated_by: user ? user.username : 'system', }); + if (options.bumpRevision) { + await this.triggerAgentPolicyUpdatedEvent(soClient, 'updated', id); + } + return (await this.get(soClient, id)) as AgentPolicy; } @@ -383,6 +387,32 @@ class AgentPolicyService { }; } + public async createFleetPolicyChangeAction( + soClient: SavedObjectsClientContract, + agentPolicyId: string + ) { + const policy = await agentPolicyService.getFullAgentPolicy(soClient, agentPolicyId); + if (!policy || !policy.revision) { + return; + } + const packages = policy.inputs.reduce((acc, input) => { + const packageName = input.meta?.package?.name; + if (packageName && acc.indexOf(packageName) < 0) { + acc.push(packageName); + } + return acc; + }, []); + + await createAgentPolicyAction(soClient, { + type: 'CONFIG_CHANGE', + data: { config: policy } as any, + ack_data: { packages }, + created_at: new Date().toISOString(), + policy_id: policy.id, + policy_revision: policy.revision, + }); + } + public async getFullAgentPolicy( soClient: SavedObjectsClientContract, id: string, diff --git a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts index 3c743dd957f62e..ff20e25e5bf0d9 100644 --- a/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts +++ b/x-pack/plugins/ingest_manager/server/services/agent_policy_update.ts @@ -8,6 +8,7 @@ import { SavedObjectsClientContract } from 'src/core/server'; import { generateEnrollmentAPIKey, deleteEnrollmentApiKeyForAgentPolicyId } from './api_keys'; import { unenrollForAgentPolicyId } from './agents'; import { outputService } from './output'; +import { agentPolicyService } from './agent_policy'; export async function agentPolicyUpdateEventHandler( soClient: SavedObjectsClientContract, @@ -15,8 +16,9 @@ export async function agentPolicyUpdateEventHandler( agentPolicyId: string ) { const adminUser = await outputService.getAdminUser(soClient); - // If no admin user fleet is not enabled just skip this hook - if (!adminUser) { + const outputId = await outputService.getDefaultOutputId(soClient); + // If no admin user and no default output fleet is not enabled just skip this hook + if (!adminUser || !outputId) { return; } @@ -24,6 +26,11 @@ export async function agentPolicyUpdateEventHandler( await generateEnrollmentAPIKey(soClient, { agentPolicyId, }); + await agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicyId); + } + + if (action === 'updated') { + await agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicyId); } if (action === 'deleted') { diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts index 80fdc305d0ba7f..866aa587b8a56a 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.test.ts @@ -6,45 +6,19 @@ import Boom from 'boom'; import { SavedObjectsBulkResponse } from 'kibana/server'; import { savedObjectsClientMock } from 'src/core/server/mocks'; -import { encryptedSavedObjectsMock } from '../../../../../plugins/encrypted_saved_objects/server/mocks'; import { Agent, - AgentAction, AgentActionSOAttributes, + BaseAgentActionSOAttributes, AgentEvent, } from '../../../common/types/models'; import { AGENT_TYPE_PERMANENT, AGENT_ACTION_SAVED_OBJECT_TYPE } from '../../../common/constants'; import { acknowledgeAgentActions } from './acks'; -import { appContextService } from '../app_context'; -import { IngestManagerAppContext } from '../../plugin'; describe('test agent acks services', () => { it('should succeed on valid and matched actions', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - }, - }) - ); mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ @@ -65,7 +39,7 @@ describe('test agent acks services', () => { } as SavedObjectsBulkResponse) ); - const agentActions = await acknowledgeAgentActions( + await acknowledgeAgentActions( mockSavedObjectsClient, ({ id: 'id', @@ -81,125 +55,32 @@ describe('test agent acks services', () => { } as AgentEvent, ] ); - expect(agentActions).toEqual([ - ({ - type: 'CONFIG_CHANGE', - id: 'action1', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - } as unknown) as AgentAction, - ]); }); it('should update config field on the agent if a policy change is acknowledged', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - data: JSON.stringify({ - config: { - id: 'policy1', - revision: 4, - settings: { - monitoring: { - enabled: true, - use_output: 'default', - logs: true, - metrics: true, - }, - }, - outputs: { - default: { - type: 'elasticsearch', - hosts: ['http://localhost:9200'], - }, - }, - inputs: [ - { - id: 'f2293360-b57c-11ea-8bd3-7bd51e425399', - name: 'system-1', - type: 'logs', - use_output: 'default', - meta: { - package: { - name: 'system', - version: '0.3.0', - }, - }, - dataset: { - namespace: 'default', - }, - streams: [ - { - id: 'logs-system.syslog', - dataset: { - name: 'system.syslog', - }, - paths: ['/var/log/messages*', '/var/log/syslog*'], - exclude_files: ['.gz$'], - multiline: { - pattern: '^\\s', - match: 'after', - }, - processors: [ - { - add_locale: null, - }, - { - add_fields: { - target: '', - fields: { - 'ecs.version': '1.5.0', - }, - }, - }, - ], - }, - ], - }, - ], - }, - }), - }, - }) - ); + const actionAttributes = { + type: 'CONFIG_CHANGE', + policy_id: 'policy1', + policy_revision: 4, + sent_at: '2020-03-14T19:45:02.620Z', + timestamp: '2019-01-04T14:32:03.36764-05:00', + created_at: '2020-03-14T19:45:02.620Z', + ack_data: JSON.stringify({ packages: ['system'] }), + }; mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action2', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - }, + attributes: actionAttributes, }, ], - } as SavedObjectsBulkResponse) + } as SavedObjectsBulkResponse) ); await acknowledgeAgentActions( @@ -214,13 +95,13 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action2', agent_id: 'id', } as AgentEvent, ] ); expect(mockSavedObjectsClient.bulkUpdate).toBeCalled(); - expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(2); + expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(1); expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0][0]).toMatchInlineSnapshot(` Object { "attributes": Object { @@ -237,111 +118,25 @@ describe('test agent acks services', () => { it('should not update config field on the agent if a policy change for an old revision is acknowledged', async () => { const mockSavedObjectsClient = savedObjectsClientMock.create(); - const mockStartEncryptedSOPlugin = encryptedSavedObjectsMock.createStart(); - appContextService.start(({ - encryptedSavedObjectsStart: mockStartEncryptedSOPlugin, - } as unknown) as IngestManagerAppContext); - - const [ - { value: mockStartEncryptedSOClient }, - ] = mockStartEncryptedSOPlugin.getClient.mock.results; - - mockStartEncryptedSOClient.getDecryptedAsInternalUser.mockReturnValue( - Promise.resolve({ - id: 'action1', - references: [], - type: AGENT_ACTION_SAVED_OBJECT_TYPE, - attributes: { - type: 'CONFIG_CHANGE', - agent_id: 'id', - sent_at: '2020-03-14T19:45:02.620Z', - timestamp: '2019-01-04T14:32:03.36764-05:00', - created_at: '2020-03-14T19:45:02.620Z', - data: JSON.stringify({ - config: { - id: 'policy1', - revision: 4, - settings: { - monitoring: { - enabled: true, - use_output: 'default', - logs: true, - metrics: true, - }, - }, - outputs: { - default: { - type: 'elasticsearch', - hosts: ['http://localhost:9200'], - }, - }, - inputs: [ - { - id: 'f2293360-b57c-11ea-8bd3-7bd51e425399', - name: 'system-1', - type: 'logs', - use_output: 'default', - meta: { - package: { - name: 'system', - version: '0.3.0', - }, - }, - dataset: { - namespace: 'default', - }, - streams: [ - { - id: 'logs-system.syslog', - dataset: { - name: 'system.syslog', - }, - paths: ['/var/log/messages*', '/var/log/syslog*'], - exclude_files: ['.gz$'], - multiline: { - pattern: '^\\s', - match: 'after', - }, - processors: [ - { - add_locale: null, - }, - { - add_fields: { - target: '', - fields: { - 'ecs.version': '1.5.0', - }, - }, - }, - ], - }, - ], - }, - ], - }, - }), - }, - }) - ); mockSavedObjectsClient.bulkGet.mockReturnValue( Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action3', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: { type: 'CONFIG_CHANGE', - agent_id: 'id', sent_at: '2020-03-14T19:45:02.620Z', timestamp: '2019-01-04T14:32:03.36764-05:00', created_at: '2020-03-14T19:45:02.620Z', + policy_id: 'policy1', + policy_revision: 99, }, }, ], - } as SavedObjectsBulkResponse) + } as SavedObjectsBulkResponse) ); await acknowledgeAgentActions( @@ -357,13 +152,13 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action3', agent_id: 'id', } as AgentEvent, ] ); expect(mockSavedObjectsClient.bulkUpdate).toBeCalled(); - expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(1); + expect(mockSavedObjectsClient.bulkUpdate.mock.calls[0][0]).toHaveLength(0); }); it('should fail for actions that cannot be found on agent actions list', async () => { @@ -372,7 +167,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action4', error: { message: 'Not found', statusCode: 404, @@ -394,7 +189,7 @@ describe('test agent acks services', () => { type: 'ACTION_RESULT', subtype: 'CONFIG', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action2', + action_id: 'action4', agent_id: 'id', } as unknown) as AgentEvent, ] @@ -412,7 +207,7 @@ describe('test agent acks services', () => { Promise.resolve({ saved_objects: [ { - id: 'action1', + id: 'action5', references: [], type: AGENT_ACTION_SAVED_OBJECT_TYPE, attributes: { @@ -439,7 +234,7 @@ describe('test agent acks services', () => { type: 'ACTION', subtype: 'FAILED', timestamp: '2019-01-04T14:32:03.36764-05:00', - action_id: 'action1', + action_id: 'action5', agent_id: 'id', } as unknown) as AgentEvent, ] diff --git a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts index 87572ce405ee70..d29dfcec7ef307 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/acks.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/acks.ts @@ -11,14 +11,15 @@ import { SavedObjectsClientContract, } from 'src/core/server'; import Boom from 'boom'; +import LRU from 'lru-cache'; import { Agent, AgentAction, + AgentPolicyAction, AgentEvent, AgentEventSOAttributes, AgentSOAttributes, AgentActionSOAttributes, - FullAgentPolicy, } from '../../types'; import { AGENT_EVENT_SAVED_OBJECT_TYPE, @@ -30,11 +31,20 @@ import { forceUnenrollAgent } from './unenroll'; const ALLOWED_ACKNOWLEDGEMENT_TYPE: string[] = ['ACTION_RESULT']; +const actionCache = new LRU({ + max: 20, + maxAge: 10 * 60 * 1000, // 10 minutes +}); + export async function acknowledgeAgentActions( soClient: SavedObjectsClientContract, agent: Agent, agentEvents: AgentEvent[] ): Promise { + if (agentEvents.length === 0) { + return []; + } + for (const agentEvent of agentEvents) { if (!isAllowedType(agentEvent.type)) { throw Boom.badRequest(`${agentEvent.type} not allowed for acknowledgment only ACTION_RESULT`); @@ -45,9 +55,9 @@ export async function acknowledgeAgentActions( .map((event) => event.action_id) .filter((actionId) => actionId !== undefined) as string[]; - let actions; + let actions: AgentAction[]; try { - actions = await getAgentActionByIds(soClient, actionIds); + actions = await fetchActionsUsingCache(soClient, actionIds); } catch (error) { if (Boom.isBoom(error) && error.output.statusCode === 404) { throw Boom.badRequest(`One or more actions cannot be found`); @@ -55,65 +65,91 @@ export async function acknowledgeAgentActions( throw error; } + const agentActionsIds: string[] = []; for (const action of actions) { - if (action.agent_id !== agent.id) { + if (action.agent_id) { + agentActionsIds.push(action.id); + } + if (action.agent_id && action.agent_id !== agent.id) { throw Boom.badRequest(`${action.id} not found`); } } - if (actions.length === 0) { - return []; - } - const isAgentUnenrolled = actions.some((action) => action.type === 'UNENROLL'); if (isAgentUnenrolled) { await forceUnenrollAgent(soClient, agent.id); } - const agentPolicy = getLatestAgentPolicyIfUpdated(agent, actions); + const configChangeAction = getLatestConfigChangePolicyActionIfUpdated(agent, actions); await soClient.bulkUpdate([ - ...(agentPolicy ? [buildUpdateAgentPolicy(agent.id, agentPolicy)] : []), - ...buildUpdateAgentActionSentAt(actionIds), + ...(configChangeAction + ? [ + { + type: AGENT_SAVED_OBJECT_TYPE, + id: agent.id, + attributes: { + policy_revision: configChangeAction.policy_revision, + packages: configChangeAction?.ack_data?.packages, + }, + }, + ] + : []), + ...buildUpdateAgentActionSentAt(agentActionsIds), ]); return actions; } -function getLatestAgentPolicyIfUpdated(agent: Agent, actions: AgentAction[]) { - return actions.reduce((acc, action) => { - if (action.type !== 'CONFIG_CHANGE') { - return acc; - } - const data = action.data || {}; +async function fetchActionsUsingCache( + soClient: SavedObjectsClientContract, + actionIds: string[] +): Promise { + const missingActionIds: string[] = []; + const actions = actionIds + .map((actionId) => { + const action = actionCache.get(actionId); + if (!action) { + missingActionIds.push(actionId); + } + return action; + }) + .filter((action): action is AgentAction => action !== undefined); + + if (missingActionIds.length === 0) { + return actions; + } - if (data?.config?.id !== agent.policy_id) { - return acc; - } + const freshActions = await getAgentActionByIds(soClient, actionIds, false); + freshActions.forEach((action) => actionCache.set(action.id, action)); - const currentRevision = (acc && acc.revision) || agent.policy_revision || 0; + return [...freshActions, ...actions]; +} - return data?.config?.revision > currentRevision ? data?.config : acc; - }, null); +function isAgentPolicyAction(action: AgentAction | AgentPolicyAction): action is AgentPolicyAction { + return (action as AgentPolicyAction).policy_id !== undefined; } -function buildUpdateAgentPolicy(agentId: string, agentPolicy: FullAgentPolicy) { - const packages = agentPolicy.inputs.reduce((acc, input) => { - const packageName = input.meta?.package?.name; - if (packageName && acc.indexOf(packageName) < 0) { - return [packageName, ...acc]; +function getLatestConfigChangePolicyActionIfUpdated( + agent: Agent, + actions: Array +): AgentPolicyAction | null { + return actions.reduce((acc, action) => { + if ( + !isAgentPolicyAction(action) || + action.type !== 'CONFIG_CHANGE' || + action.policy_id !== agent.policy_id || + (acc?.policy_revision ?? 0) < (agent.policy_revision || 0) + ) { + return acc; } - return acc; - }, []); - return { - type: AGENT_SAVED_OBJECT_TYPE, - id: agentId, - attributes: { - policy_revision: agentPolicy.revision, - packages, - }, - }; + if (action.policy_revision > (acc?.policy_revision ?? 0)) { + return action; + } + + return acc; + }, null); } function buildUpdateAgentActionSentAt( diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts index c7390079523893..bcb3fc7fdc7bd4 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.test.ts @@ -22,7 +22,13 @@ describe('test agent actions services', () => { }; mockSavedObjectsClient.create.mockReturnValue( Promise.resolve({ - attributes: {}, + attributes: { + agent_id: 'agentid', + type: 'CONFIG_CHANGE', + data: JSON.stringify({ content: 'data' }), + sent_at: '2020-03-14T19:45:02.620Z', + created_at: '2020-03-14T19:45:02.620Z', + }, } as SavedObject) ); await createAgentAction(mockSavedObjectsClient, newAgentAction); diff --git a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts index cd0dd921312306..8519714334986d 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/actions.ts @@ -5,9 +5,20 @@ */ import { SavedObjectsClientContract } from 'kibana/server'; -import { Agent, AgentAction, AgentActionSOAttributes } from '../../../common/types/models'; +import { + Agent, + AgentAction, + AgentPolicyAction, + BaseAgentActionSOAttributes, + AgentActionSOAttributes, + AgentPolicyActionSOAttributes, +} from '../../../common/types/models'; import { AGENT_ACTION_SAVED_OBJECT_TYPE } from '../../../common/constants'; -import { savedObjectToAgentAction } from './saved_objects'; +import { + isAgentActionSavedObject, + isPolicyActionSavedObject, + savedObjectToAgentAction, +} from './saved_objects'; import { appContextService } from '../app_context'; import { nodeTypes } from '../../../../../../src/plugins/data/common'; @@ -15,15 +26,45 @@ export async function createAgentAction( soClient: SavedObjectsClientContract, newAgentAction: Omit ): Promise { - const so = await soClient.create(AGENT_ACTION_SAVED_OBJECT_TYPE, { + return createAction(soClient, newAgentAction); +} + +export function createAgentPolicyAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise { + return createAction(soClient, newAgentAction); +} +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise; +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit +): Promise; +async function createAction( + soClient: SavedObjectsClientContract, + newAgentAction: Omit | Omit +): Promise { + const so = await soClient.create(AGENT_ACTION_SAVED_OBJECT_TYPE, { ...newAgentAction, data: newAgentAction.data ? JSON.stringify(newAgentAction.data) : undefined, + ack_data: newAgentAction.ack_data ? JSON.stringify(newAgentAction.ack_data) : undefined, }); - const agentAction = savedObjectToAgentAction(so); - agentAction.data = newAgentAction.data; + if (isAgentActionSavedObject(so)) { + const agentAction = savedObjectToAgentAction(so); + agentAction.data = newAgentAction.data; + + return agentAction; + } else if (isPolicyActionSavedObject(so)) { + const agentAction = savedObjectToAgentAction(so); + agentAction.data = newAgentAction.data; - return agentAction; + return agentAction; + } + throw new Error('Invalid action'); } export async function getAgentActionsForCheckin( @@ -67,7 +108,8 @@ export async function getAgentActionsForCheckin( export async function getAgentActionByIds( soClient: SavedObjectsClientContract, - actionIds: string[] + actionIds: string[], + decryptData: boolean = true ) { const actions = ( await soClient.bulkGet( @@ -76,7 +118,11 @@ export async function getAgentActionByIds( type: AGENT_ACTION_SAVED_OBJECT_TYPE, })) ) - ).saved_objects.map(savedObjectToAgentAction); + ).saved_objects.map((action) => savedObjectToAgentAction(action)); + + if (!decryptData) { + return actions; + } return Promise.all( actions.map(async (action) => { @@ -93,6 +139,39 @@ export async function getAgentActionByIds( ); } +export async function getAgentPolicyActionByIds( + soClient: SavedObjectsClientContract, + actionIds: string[], + decryptData: boolean = true +) { + const actions = ( + await soClient.bulkGet( + actionIds.map((actionId) => ({ + id: actionId, + type: AGENT_ACTION_SAVED_OBJECT_TYPE, + })) + ) + ).saved_objects.map((action) => savedObjectToAgentAction(action)); + + if (!decryptData) { + return actions; + } + + return Promise.all( + actions.map(async (action) => { + // Get decrypted actions + return savedObjectToAgentAction( + await appContextService + .getEncryptedSavedObjects() + .getDecryptedAsInternalUser( + AGENT_ACTION_SAVED_OBJECT_TYPE, + action.id + ) + ); + }) + ); +} + export async function getNewActionsSince(soClient: SavedObjectsClientContract, timestamp: string) { const filter = nodeTypes.function.buildNode('and', [ nodeTypes.function.buildNode( @@ -116,7 +195,26 @@ export async function getNewActionsSince(soClient: SavedObjectsClientContract, t filter, }); - return res.saved_objects.map(savedObjectToAgentAction); + return res.saved_objects + .filter(isAgentActionSavedObject) + .map((so) => savedObjectToAgentAction(so)); +} + +export async function getLatestConfigChangeAction( + soClient: SavedObjectsClientContract, + policyId: string +) { + const res = await soClient.find({ + type: AGENT_ACTION_SAVED_OBJECT_TYPE, + search: policyId, + searchFields: ['policy_id'], + sortField: 'created_at', + sortOrder: 'DESC', + }); + + if (res.saved_objects[0]) { + return savedObjectToAgentAction(res.saved_objects[0]); + } } export interface ActionsService { @@ -124,6 +222,6 @@ export interface ActionsService { createAgentAction: ( soClient: SavedObjectsClientContract, - newAgentAction: AgentActionSOAttributes + newAgentAction: Omit ) => Promise; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index eddfb0e64b84b2..8f586420c3ecb7 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -5,6 +5,7 @@ */ import { timer, from, Observable, TimeoutError } from 'rxjs'; +import { omit } from 'lodash'; import { shareReplay, distinctUntilKeyChanged, @@ -16,14 +17,7 @@ import { take, } from 'rxjs/operators'; import { SavedObjectsClientContract, KibanaRequest } from 'src/core/server'; -import { - Agent, - AgentAction, - AgentSOAttributes, - AgentPolicy, - FullAgentPolicy, -} from '../../../types'; -import { agentPolicyService } from '../../agent_policy'; +import { Agent, AgentAction, AgentPolicyAction, AgentSOAttributes } from '../../../types'; import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, @@ -31,7 +25,11 @@ import { AGENT_POLICY_ROLLOUT_RATE_LIMIT_INTERVAL_MS, AGENT_POLICY_ROLLOUT_RATE_LIMIT_REQUEST_PER_INTERVAL, } from '../../../constants'; -import { createAgentAction, getNewActionsSince } from '../actions'; +import { + getNewActionsSince, + getLatestConfigChangeAction, + getAgentPolicyActionByIds, +} from '../actions'; import { appContextService } from '../../app_context'; import { toPromiseAbortable, AbortError, createRateLimiter } from './rxjs_utils'; @@ -54,27 +52,27 @@ function getInternalUserSOClient() { return appContextService.getInternalUserSOClient(fakeRequest); } -function createAgentPolicySharedObservable(agentPolicyId: string) { +function createNewActionsSharedObservable(): Observable { const internalSOClient = getInternalUserSOClient(); + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( - switchMap(() => - from(agentPolicyService.get(internalSOClient, agentPolicyId) as Promise) - ), - distinctUntilKeyChanged('revision'), - switchMap((data) => - from(agentPolicyService.getFullAgentPolicy(internalSOClient, agentPolicyId)) - ), + switchMap(() => { + return from(getNewActionsSince(internalSOClient, new Date().toISOString())); + }), shareReplay({ refCount: true, bufferSize: 1 }) ); } -function createNewActionsSharedObservable(): Observable { - return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( - switchMap(() => { - const internalSOClient = getInternalUserSOClient(); +function createAgentPolicyActionSharedObservable(agentPolicyId: string) { + const internalSOClient = getInternalUserSOClient(); - return from(getNewActionsSince(internalSOClient, new Date().toISOString())); - }), + return timer(0, AGENT_UPDATE_ACTIONS_INTERVAL_MS).pipe( + switchMap(() => from(getLatestConfigChangeAction(internalSOClient, agentPolicyId))), + filter((data): data is AgentPolicyAction => data !== undefined), + distinctUntilKeyChanged('id'), + switchMap((data) => + from(getAgentPolicyActionByIds(internalSOClient, [data.id]).then((r) => r[0])) + ), shareReplay({ refCount: true, bufferSize: 1 }) ); } @@ -102,47 +100,35 @@ async function getOrCreateAgentDefaultOutputAPIKey( return outputAPIKey.key; } -function shouldCreateAgentPolicyAction(agent: Agent, agentPolicy: FullAgentPolicy | null): boolean { - if (!agentPolicy || !agentPolicy.revision) { - return false; - } - const isAgentPolicyOutdated = - !agent.policy_revision || agent.policy_revision < agentPolicy.revision; - if (!isAgentPolicyOutdated) { - return false; - } - - return true; -} - -async function createAgentActionFromAgentPolicy( +async function createAgentActionFromPolicyAction( soClient: SavedObjectsClientContract, agent: Agent, - policy: FullAgentPolicy | null + policyAction: AgentPolicyAction ) { - // Deep clone !not supporting Date, and undefined value. - const newAgentPolicy = JSON.parse(JSON.stringify(policy)); + const newAgentAction: AgentAction = Object.assign( + omit( + // Faster than clone + JSON.parse(JSON.stringify(policyAction)) as AgentPolicyAction, + 'policy_id', + 'policy_revision' + ), + { + agent_id: agent.id, + } + ); // Mutate the policy to set the api token for this agent - newAgentPolicy.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey( + newAgentAction.data.config.outputs.default.api_key = await getOrCreateAgentDefaultOutputAPIKey( soClient, agent ); - const policyChangeAction = await createAgentAction(soClient, { - agent_id: agent.id, - type: 'CONFIG_CHANGE', - data: { config: newAgentPolicy } as any, - created_at: new Date().toISOString(), - sent_at: undefined, - }); - - return [policyChangeAction]; + return [newAgentAction]; } export function agentCheckinStateNewActionsFactory() { // Shared Observables - const agentPolicies$ = new Map>(); + const agentPolicies$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); // Rx operators const rateLimiter = createRateLimiter( @@ -162,7 +148,7 @@ export function agentCheckinStateNewActionsFactory() { } const agentPolicyId = agent.policy_id; if (!agentPolicies$.has(agentPolicyId)) { - agentPolicies$.set(agentPolicyId, createAgentPolicySharedObservable(agentPolicyId)); + agentPolicies$.set(agentPolicyId, createAgentPolicyActionSharedObservable(agentPolicyId)); } const agentPolicy$ = agentPolicies$.get(agentPolicyId); if (!agentPolicy$) { @@ -174,15 +160,22 @@ export function agentCheckinStateNewActionsFactory() { // Set a timeout 3s before the real timeout to have a chance to respond an empty response before socket timeout Math.max((appContextService.getConfig()?.fleet.pollingRequestTimeout ?? 0) - 3000, 3000) ), - filter((agentPolicy) => shouldCreateAgentPolicyAction(agent, agentPolicy)), + filter( + (action) => + agent.policy_id !== undefined && + action.policy_revision !== undefined && + action.policy_id !== undefined && + action.policy_id === agent.policy_id && + (!agent.policy_revision || action.policy_revision > agent.policy_revision) + ), rateLimiter(), - mergeMap((agentPolicy) => createAgentActionFromAgentPolicy(soClient, agent, agentPolicy)), + mergeMap((policyAction) => createAgentActionFromPolicyAction(soClient, agent, policyAction)), merge(newActions$), mergeMap(async (data) => { if (!data) { return; } - const newActions = data.filter((action) => action.agent_id); + const newActions = data.filter((action) => action.agent_id === agent.id); if (newActions.length === 0) { return; } diff --git a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts index 2ab5cc8139f69f..3ae664c086da99 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/saved_objects.ts @@ -6,7 +6,15 @@ import Boom from 'boom'; import { SavedObject } from 'src/core/server'; -import { Agent, AgentSOAttributes, AgentAction, AgentActionSOAttributes } from '../../types'; +import { + Agent, + AgentSOAttributes, + AgentAction, + AgentPolicyAction, + AgentActionSOAttributes, + AgentPolicyActionSOAttributes, + BaseAgentActionSOAttributes, +} from '../../types'; export function savedObjectToAgent(so: SavedObject): Agent { if (so.error) { @@ -27,7 +35,13 @@ export function savedObjectToAgent(so: SavedObject): Agent { }; } -export function savedObjectToAgentAction(so: SavedObject): AgentAction { +export function savedObjectToAgentAction(so: SavedObject): AgentAction; +export function savedObjectToAgentAction( + so: SavedObject +): AgentPolicyAction; +export function savedObjectToAgentAction( + so: SavedObject +): AgentAction | AgentPolicyAction { if (so.error) { if (so.error.statusCode === 404) { throw Boom.notFound(so.error.message); @@ -36,9 +50,42 @@ export function savedObjectToAgentAction(so: SavedObject +): so is SavedObject { + return (so.attributes as AgentActionSOAttributes).agent_id !== undefined; +} + +export function isPolicyActionSavedObject( + so: SavedObject +): so is SavedObject { + return (so.attributes as AgentPolicyActionSOAttributes).policy_id !== undefined; +} diff --git a/x-pack/plugins/ingest_manager/server/services/setup.ts b/x-pack/plugins/ingest_manager/server/services/setup.ts index ec3a05a4fa390d..f02057bae15981 100644 --- a/x-pack/plugins/ingest_manager/server/services/setup.ts +++ b/x-pack/plugins/ingest_manager/server/services/setup.ts @@ -170,6 +170,12 @@ export async function setupFleet( }); }) ); + + await Promise.all( + agentPolicies.map((agentPolicy) => + agentPolicyService.createFleetPolicyChangeAction(soClient, agentPolicy.id) + ) + ); } function generateRandomPassword() { diff --git a/x-pack/plugins/ingest_manager/server/types/index.tsx b/x-pack/plugins/ingest_manager/server/types/index.tsx index 2746dfcd00ce37..d00491afef72b1 100644 --- a/x-pack/plugins/ingest_manager/server/types/index.tsx +++ b/x-pack/plugins/ingest_manager/server/types/index.tsx @@ -16,7 +16,10 @@ export { AgentEvent, AgentEventSOAttributes, AgentAction, + AgentPolicyAction, + BaseAgentActionSOAttributes, AgentActionSOAttributes, + AgentPolicyActionSOAttributes, PackagePolicy, PackagePolicyInput, PackagePolicyInputStream, diff --git a/x-pack/plugins/ingest_manager/server/types/models/agent.ts b/x-pack/plugins/ingest_manager/server/types/models/agent.ts index 5ad98cfd406226..b249705fe6c2fd 100644 --- a/x-pack/plugins/ingest_manager/server/types/models/agent.ts +++ b/x-pack/plugins/ingest_manager/server/types/models/agent.ts @@ -62,12 +62,7 @@ export const AgentEventSchema = schema.object({ }); export const NewAgentActionSchema = schema.object({ - type: schema.oneOf([ - schema.literal('CONFIG_CHANGE'), - schema.literal('DATA_DUMP'), - schema.literal('RESUME'), - schema.literal('PAUSE'), - ]), + type: schema.oneOf([schema.literal('CONFIG_CHANGE'), schema.literal('UNENROLL')]), data: schema.maybe(schema.any()), sent_at: schema.maybe(schema.string()), }); diff --git a/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts b/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts index 2b4bb335dfc5c9..68e02933f56500 100644 --- a/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts +++ b/x-pack/test/ingest_manager_api_integration/apis/fleet/agents/actions.ts @@ -28,13 +28,12 @@ export default function (providerContext: FtrProviderContext) { action: { type: 'CONFIG_CHANGE', data: { data: 'action_data' }, - sent_at: '2020-03-18T19:45:02.620Z', }, }) .expect(200); + expect(apiResponse.item.type).to.eql('CONFIG_CHANGE'); expect(apiResponse.item.data).to.eql({ data: 'action_data' }); - expect(apiResponse.item.sent_at).to.be('2020-03-18T19:45:02.620Z'); }); it('should return a 400 when request does not have type information', async () => {