Skip to content

Commit

Permalink
[APM] Fix condition to use serviceTransactionMetric documents (#180903
Browse files Browse the repository at this point in the history
)

closes #167578
(#167578 (comment))

## Summary

This PR fixes when `/time_range_metadata` returns `hasDocs=true` for
`serviceTransactionMetric` to make sure the UI doesn't lose information
by querying `serviceTransactionMetric` docs in a range where docs from
APM < 8.7 still exists.


**IMPORTANT** ❗ 

Based on the APM Server
[changelog](https://github.com/elastic/apm-server/blob/bbf4eb276bee1deb29ad65a3ab79ac31a482f03d/changelogs/8.7.asciidoc#L71)
and this [issue](elastic/apm-server#9703),
`serviceTransactionMetric` doc was added in 8.7 as well as the
aggregations by `10m` and `60m` intervals for `transaction` docs. In
`8.7`, the `transaction.duration.summary` was also introduced, therefore
the new rollup interval docs will always contain the new field.

Based on that, the logic was changed to

- Verify whether `transaction` `10m` and `60m` are supported within a
given time range
- Verify whether or not `transaction.duration.summary` is supported for
`transaction` `1m` docs
- All other doc types and intervals are presupposed to support
`transaction.duration.summary`

### Range starting from when there is no data (contains docs generated
by APM < 8.7 and APM >= 8.7)

<img width="1210" alt="image"
src="https://github.com/elastic/kibana/assets/2767137/d7767d28-03ef-4f5a-b10c-b8529d9b0f69">


```json
{
    "isUsingServiceDestinationMetrics": false,
    "sources": [
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "1m",
            "hasDocs": false,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "10m",
            "hasDocs": false,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "60m",
            "hasDocs": false,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "1m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "10m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "60m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionEvent",
            "rollupInterval": "none",
            "hasDocs": true,
            "hasDurationSummaryField": false
        }
    ]
}
```

### Range starting from when first docs were ingested (contains docs
generated by APM < 8.7 and APM >= 8.7)

<img width="1208" alt="image"
src="https://github.com/elastic/kibana/assets/2767137/982f2359-fb20-4bf8-8d9a-afbb1cd5b0ea">


```json
{
    "isUsingServiceDestinationMetrics": false,
    "sources": [
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "1m",
            "hasDocs": false,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "10m",
            "hasDocs": false,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "60m",
            "hasDocs": false,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "1m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "10m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "60m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionEvent",
            "rollupInterval": "none",
            "hasDocs": true,
            "hasDurationSummaryField": false
        }
    ]
}
```

### Range starting from when only APM >= 8.7 docs exist
<img width="1198" alt="image"
src="https://github.com/elastic/kibana/assets/2767137/3a972457-3a0a-440d-85a1-add8a48f37d1">


```json
{
    "isUsingServiceDestinationMetrics": false,
    "sources": [
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "1m",
            "hasDocs": true,
            "hasDurationSummaryField": true
        },
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "10m",
            "hasDocs": true,
            "hasDurationSummaryField": true
        },
        {
            "documentType": "serviceTransactionMetric",
            "rollupInterval": "60m",
            "hasDocs": true,
            "hasDurationSummaryField": true
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "1m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "10m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionMetric",
            "rollupInterval": "60m",
            "hasDocs": true,
            "hasDurationSummaryField": false
        },
        {
            "documentType": "transactionEvent",
            "rollupInterval": "none",
            "hasDocs": true,
            "hasDurationSummaryField": false
        }
    ]
}
```
### How to tests

Generate docs without ServiceTransactionMetrics
```bash
node scripts/synthtrace service_summary_field_version_dependent.ts  --versionOverride=8.6.2  --from=now-2h --to=now --clean
```

Generate docs with ServiceTransactionMetrics
```bash
node scripts/synthtrace service_summary_field_version_dependent.ts --from=now-1h --to=now  
```  

Follow the same steps from above.

---------

Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com>
  • Loading branch information
crespocarlos and kibanamachine committed May 7, 2024
1 parent a5ba2d6 commit 314d6ee
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
* Side Public License, v 1.
*/

import semver from 'semver';
import { PassThrough, pipeline, Readable } from 'stream';
import { getDedotTransform } from '../../../shared/get_dedot_transform';
import { getSerializeTransform } from '../../../shared/get_serialize_transform';
import { Logger } from '../../../utils/create_logger';
import { fork } from '../../../utils/stream_utils';
import { deleteSummaryFieldTransform } from '../../../utils/transform_helpers';
import { createBreakdownMetricsAggregator } from '../../aggregators/create_breakdown_metrics_aggregator';
import { createServiceMetricsAggregator } from '../../aggregators/create_service_metrics_aggregator';
import { createServiceSummaryMetricsAggregator } from '../../aggregators/create_service_summary_metrics_aggregator';
Expand All @@ -22,22 +24,32 @@ import { getRoutingTransform } from './get_routing_transform';

export function apmPipeline(logger: Logger, version: string, includeSerialization: boolean = true) {
return (base: Readable) => {
const continousRollupSupported =
!version || semver.gte(semver.coerce(version)?.version ?? version, '8.7.0');

const aggregators = [
createTransactionMetricsAggregator('1m'),
createTransactionMetricsAggregator('10m'),
createTransactionMetricsAggregator('60m'),
createServiceMetricsAggregator('1m'),
createServiceMetricsAggregator('10m'),
createServiceMetricsAggregator('60m'),
createServiceSummaryMetricsAggregator('1m'),
createServiceSummaryMetricsAggregator('10m'),
createServiceSummaryMetricsAggregator('60m'),
createSpanMetricsAggregator('1m'),
createSpanMetricsAggregator('10m'),
createSpanMetricsAggregator('60m'),
...(continousRollupSupported
? [
createTransactionMetricsAggregator('10m'),
createTransactionMetricsAggregator('60m'),
createServiceMetricsAggregator('1m'),
createServiceMetricsAggregator('10m'),
createServiceMetricsAggregator('60m'),
createServiceSummaryMetricsAggregator('1m'),
createServiceSummaryMetricsAggregator('10m'),
createServiceSummaryMetricsAggregator('60m'),
createSpanMetricsAggregator('10m'),
createSpanMetricsAggregator('60m'),
]
: []),
];

const serializationTransform = includeSerialization ? [getSerializeTransform()] : [];
const removeDurationSummaryTransform = !continousRollupSupported
? [deleteSummaryFieldTransform()]
: [];

return pipeline(
// @ts-expect-error Some weird stuff here with the type definition for pipeline. We have tests!
Expand All @@ -49,6 +61,7 @@ export function apmPipeline(logger: Logger, version: string, includeSerializatio
getApmServerMetadataTransform(version),
getRoutingTransform(),
getDedotTransform(),
...removeDurationSummaryTransform,
(err) => {
if (err) {
logger.error(err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,15 @@ export class ApmSynthtraceEsClient extends SynthtraceEsClient<ApmFields> {
this.logger.info(`Updated component template: ${name}`);
}

getDefaultPipeline(includeSerialization: boolean = true) {
return apmPipeline(this.logger, this.version, includeSerialization);
getDefaultPipeline(
{
includeSerialization,
versionOverride,
}: {
includeSerialization?: boolean;
versionOverride?: string;
} = { includeSerialization: true }
) {
return apmPipeline(this.logger, versionOverride ?? this.version, includeSerialization);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,34 @@

import { ApmFields, apm } from '@kbn/apm-synthtrace-client';
import { random } from 'lodash';
import { pipeline, Readable } from 'stream';
import { Readable } from 'stream';
import semver from 'semver';
import { Scenario } from '../cli/scenario';
import {
addObserverVersionTransform,
deleteSummaryFieldTransform,
} from '../lib/utils/transform_helpers';
import { withClient } from '../lib/utils/with_client';
import { RunOptions } from '../cli/utils/parse_run_cli_flags';
import { Logger } from '../lib/utils/create_logger';

const scenario: Scenario<ApmFields> = async ({ logger, versionOverride }) => {
const scenario: Scenario<ApmFields> = async ({
logger,
versionOverride,
}: RunOptions & { logger: Logger }) => {
const version = versionOverride as string;
const isLegacy = versionOverride && semver.lt(version, '8.7.0');
const isLegacy = version ? semver.lt(version as string, '8.7.0') : false;
return {
bootstrap: async ({ apmEsClient }) => {
if (isLegacy) {
apmEsClient.pipeline((base: Readable) => {
const defaultPipeline = apmEsClient.getDefaultPipeline()(
base
) as unknown as NodeJS.ReadableStream;

return pipeline(
defaultPipeline,
addObserverVersionTransform(version),
deleteSummaryFieldTransform(),
(err) => {
if (err) {
logger.error(err);
}
}
);
return apmEsClient.getDefaultPipeline({
versionOverride: version,
})(base);
});
}
},
generate: ({ range, clients: { apmEsClient } }) => {
const successfulTimestamps = range.ratePerMinute(6);
const instance = apm
.service({
name: `java${isLegacy ? '-legacy' : ''}`,
name: `java`,
environment: 'production',
agentName: 'java',
})
Expand Down
2 changes: 1 addition & 1 deletion packages/kbn-journeys/services/synthtrace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async function initApmSynthtraceClient(options: SynthtraceClientOptions) {
version: packageVersion,
});

synthEsClient.pipeline(synthEsClient.getDefaultPipeline(false));
synthEsClient.pipeline(synthEsClient.getDefaultPipeline({ includeSerialization: false }));

return synthEsClient;
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ GET apm-*-metric-*,metrics-apm*/_search?terminate_after=1000

Service transaction metrics are aggregated metric documents that hold latency and throughput metrics pivoted by `service.name`, `service.environment` and `transaction.type`. Additionally, `agent.name` and `service.language.name` are included as metadata.

We use the response from the `GET /internal/apm/time_range_metadata` endpoint to determine what data source is available. A data source is considered available if there is either data before the current time range, or, if there is no data at all before the current time range, if there is data within the current time range. This means that existing deployments will use transaction metrics right after upgrading (instead of using service transaction metrics and seeing a mostly blank screen), but also that new deployments immediately get the benefits of service transaction metrics, instead of falling all the way back to transaction events.
We use the response from the `GET /internal/apm/time_range_metadata` endpoint to determine what data source is available. Service transaction metrics docs, introduced in APM >= 8.7, is considered available if there is data before *and* within the current time range. This ensure the UI won't miss information shipped by APM < 8.7. For < 8.7 documents, availability is determined by whether there is data before the current time range, or no data at all before the current time range, but there is data within the current time range. This means that existing deployments will use transaction metrics right after upgrading (instead of using service transaction metrics and seeing a mostly blank screen), but also that new deployments immediately get the benefits of service transaction metrics, instead of falling all the way back to transaction events.

A pre-aggregated document where `_doc_count` is the number of transaction events

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ export function setupNodeEvents(on: Cypress.PluginEvents, config: Cypress.Plugin
version: config.env.APM_PACKAGE_VERSION,
});

synthtraceEsClient.pipeline(synthtraceEsClient.getDefaultPipeline(false));
synthtraceEsClient.pipeline(
synthtraceEsClient.getDefaultPipeline({ includeSerialization: false })
);

initPlugin(on, config);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,13 @@ import { RollupInterval } from '../../../common/rollup';
import { APMEventClient } from './create_es_client/create_apm_event_client';
import { getConfigForDocumentType } from './create_es_client/document_type';
import { TimeRangeMetadata } from '../../../common/time_range_metadata';
import { getDurationLegacyFilter } from './transactions';
import { isDurationSummaryNotSupportedFilter } from './transactions';

const QUERY_INDEX = {
BEFORE: 0,
CURRENT: 1,
DURATION_SUMMARY: 2,
DOCUMENT_TYPE: 0,
DURATION_SUMMARY_NOT_SUPPORTED: 1,
} as const;

interface DocumentTypeData {
documentType: ApmDocumentType;
rollupInterval: RollupInterval;
hasDocBefore: boolean;
hasDocAfter: boolean;
allHaveDurationSummary: boolean;
}

const getRequest = ({
documentType,
rollupInterval,
Expand Down Expand Up @@ -93,10 +84,8 @@ export async function getDocumentSources({
documentTypesToCheck,
});

const hasAnySourceDocBefore = documentTypesInfo.some((source) => source.hasDocBefore);

return [
...mapToSources(documentTypesInfo, hasAnySourceDocBefore),
...documentTypesInfo,
{
documentType: ApmDocumentType.TransactionEvent,
rollupInterval: RollupInterval.None,
Expand All @@ -120,7 +109,7 @@ const getDocumentTypesInfo = async ({
kuery: string;
enableContinuousRollups: boolean;
documentTypesToCheck: ApmDocumentType[];
}) => {
}): Promise<TimeRangeMetadata['sources']> => {
const getRequests = getDocumentTypeRequestsFn({
enableContinuousRollups,
start,
Expand All @@ -131,25 +120,36 @@ const getDocumentTypesInfo = async ({
const sourceRequests = documentTypesToCheck.flatMap(getRequests);

const allSearches = sourceRequests
.flatMap(({ before, current, durationSummaryCheck }) => [before, current, durationSummaryCheck])
.flatMap(({ documentTypeQuery, durationSummaryNotSupportedQuery }) => [
documentTypeQuery,
durationSummaryNotSupportedQuery,
])
.filter((request): request is ReturnType<typeof getRequest> => request !== undefined);

const allResponses = (await apmEventClient.msearch('get_document_availability', ...allSearches))
.responses;

const hasAnyLegacyDocuments = sourceRequests.some(
({ documentType, rollupInterval }, index) =>
isLegacyDocType(documentType, rollupInterval) &&
allResponses[index + QUERY_INDEX.DURATION_SUMMARY_NOT_SUPPORTED].hits.total.value > 0
);

return sourceRequests.map(({ documentType, rollupInterval, ...queries }) => {
const numberOfQueries = Object.values(queries).filter(Boolean).length;
// allResponses is sorted by the order of the requests in sourceRequests
const docTypeResponses = allResponses.splice(0, numberOfQueries);
const hasDocs = docTypeResponses[QUERY_INDEX.DOCUMENT_TYPE].hits.total.value > 0;
// can only use >=8.7 document types (ServiceTransactionMetrics or TransactionMetrics with 10m and 60m intervals)
// if there are no legacy documents
const canUseContinousRollupDocs = hasDocs && !hasAnyLegacyDocuments;

return {
documentType,
rollupInterval,
hasDocBefore: docTypeResponses[QUERY_INDEX.BEFORE].hits.total.value > 0,
hasDocAfter: docTypeResponses[QUERY_INDEX.CURRENT].hits.total.value > 0,
allHaveDurationSummary: docTypeResponses[QUERY_INDEX.DURATION_SUMMARY]
? docTypeResponses[QUERY_INDEX.DURATION_SUMMARY].hits.total.value === 0
: true,
hasDocs: isLegacyDocType(documentType, rollupInterval) ? hasDocs : canUseContinousRollupDocs,
// all >=8.7 document types with rollups support duration summary
hasDurationSummaryField: canUseContinousRollupDocs,
};
});
};
Expand All @@ -168,9 +168,7 @@ const getDocumentTypeRequestsFn =
}) =>
(documentType: ApmDocumentType) => {
const currentRange = rangeQuery(start, end);
const diff = end - start;
const kql = kqlQuery(kuery);
const beforeRange = rangeQuery(start - diff, end - diff);

const rollupIntervals = enableContinuousRollups
? getConfigForDocumentType(documentType).rollupIntervals
Expand All @@ -179,48 +177,26 @@ const getDocumentTypeRequestsFn =
return rollupIntervals.map((rollupInterval) => ({
documentType,
rollupInterval,
before: getRequest({
documentType,
rollupInterval,
filters: [...kql, ...beforeRange],
}),
current: getRequest({
documentTypeQuery: getRequest({
documentType,
rollupInterval,
filters: [...kql, ...currentRange],
}),
...(documentType !== ApmDocumentType.ServiceTransactionMetric
...(isLegacyDocType(documentType, rollupInterval)
? {
durationSummaryCheck: getRequest({
durationSummaryNotSupportedQuery: getRequest({
documentType,
rollupInterval,
filters: [...kql, ...currentRange, getDurationLegacyFilter()],
filters: [...kql, ...currentRange, isDurationSummaryNotSupportedFilter()],
}),
}
: {}),
: undefined),
}));
};

const mapToSources = (sources: DocumentTypeData[], hasAnySourceDocBefore: boolean) => {
return sources.map((source) => {
const { documentType, hasDocAfter, hasDocBefore, rollupInterval, allHaveDurationSummary } =
source;

const hasDocBeforeOrAfter = hasDocBefore || hasDocAfter;

// If there is any data before, we require that data is available before
// this time range to mark this source as available. If we don't do that,
// users that upgrade to a version that starts generating service tx metrics
// will see a mostly empty screen for a while after upgrading.
// If we only check before, users with a new deployment will use raw transaction
// events.
const hasDocs = hasAnySourceDocBefore ? hasDocBefore : hasDocBeforeOrAfter;

return {
documentType,
rollupInterval,
hasDocs,
hasDurationSummaryField: allHaveDurationSummary,
};
});
const isLegacyDocType = (documentType: ApmDocumentType, rollupInterval: RollupInterval) => {
return (
documentType === ApmDocumentType.TransactionMetric &&
rollupInterval === RollupInterval.OneMinute
);
};
Original file line number Diff line number Diff line change
Expand Up @@ -164,17 +164,10 @@ export function isRootTransaction(searchAggregatedTransactions: boolean) {
};
}

export function getDurationLegacyFilter(): QueryDslQueryContainer {
export function isDurationSummaryNotSupportedFilter(): QueryDslQueryContainer {
return {
bool: {
must: [
{
bool: {
filter: [{ exists: { field: TRANSACTION_DURATION_HISTOGRAM } }],
must_not: [{ exists: { field: TRANSACTION_DURATION_SUMMARY } }],
},
},
],
must_not: [{ exists: { field: TRANSACTION_DURATION_SUMMARY } }],
},
};
}

0 comments on commit 314d6ee

Please sign in to comment.