Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

usage reporting: fix memory leak #6998

Merged
merged 1 commit into from Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/tasty-elephants-give.md
@@ -0,0 +1,5 @@
---
'@apollo/server': patch
---

Fix a slow memory leak in the usage reporting plugin (#6983).
101 changes: 56 additions & 45 deletions packages/server/src/plugin/usageReporting/plugin.ts
Expand Up @@ -46,22 +46,6 @@ const reportHeaderDefaults = {
uname: `${os.platform()}, ${os.type()}, ${os.release()}, ${os.arch()})`,
};

class ReportData {
report!: OurReport;
readonly header: ReportHeader;
constructor(executableSchemaId: string, graphRef: string) {
this.header = new ReportHeader({
...reportHeaderDefaults,
executableSchemaId,
graphRef,
});
this.reset();
}
reset() {
this.report = new OurReport(this.header);
}
}

export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
options: ApolloServerPluginUsageReportingOptions<TContext> = Object.create(
null,
Expand Down Expand Up @@ -143,9 +127,45 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
cache: LRUCache<string, OperationDerivedData>;
} | null = null;

const reportDataByExecutableSchemaId: {
[executableSchemaId: string]: ReportData | undefined;
} = Object.create(null);
// This map maps from executable schema ID (schema hash, basically) to the
// report we'll send about it. That's because when we're using a gateway,
// the schema can change over time, but each report needs to be about a
// single schema. We avoid having this function be a memory leak by
// removing values from it when we're in the process of sending reports.
// That means we have to be very careful never to pull a Report out of it
// and hang on to it for a while before writing to it, because the report
// might have gotten sent and discarded in the meantime. So you should
// only access the values of this Map via
// getReportWhichMustBeUsedImmediately and getAndDeleteReport, and never
// hang on to the value returned by getReportWhichMustBeUsedImmediately.
const reportByExecutableSchemaId = new Map<string, OurReport>();
const getReportWhichMustBeUsedImmediately = (
executableSchemaId: string,
): OurReport => {
const existing = reportByExecutableSchemaId.get(executableSchemaId);
if (existing) {
return existing;
}
const report = new OurReport(
new ReportHeader({
...reportHeaderDefaults,
executableSchemaId,
graphRef,
}),
);
reportByExecutableSchemaId.set(executableSchemaId, report);
return report;
};
const getAndDeleteReport = (
executableSchemaId: string,
): OurReport | null => {
const report = reportByExecutableSchemaId.get(executableSchemaId);
if (report) {
reportByExecutableSchemaId.delete(executableSchemaId);
return report;
}
return null;
};

const overriddenExecutableSchemaId = options.overrideReportedSchema
? computeCoreSchemaHash(options.overrideReportedSchema)
Expand Down Expand Up @@ -192,21 +212,10 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
return id;
}

const getReportData = (executableSchemaId: string): ReportData => {
const existing = reportDataByExecutableSchemaId[executableSchemaId];
if (existing) {
return existing;
}
const reportData = new ReportData(executableSchemaId, graphRef);
reportDataByExecutableSchemaId[executableSchemaId] = reportData;
return reportData;
};

async function sendAllReportsAndReportErrors(): Promise<void> {
await Promise.all(
Object.keys(reportDataByExecutableSchemaId).map(
(executableSchemaId) =>
sendReportAndReportErrors(executableSchemaId),
[...reportByExecutableSchemaId.keys()].map((executableSchemaId) =>
sendReportAndReportErrors(executableSchemaId),
),
);
}
Expand All @@ -228,13 +237,11 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(

// Needs to be an arrow function to be confident that key is defined.
const sendReport = async (executableSchemaId: string): Promise<void> => {
const reportData = getReportData(executableSchemaId);
const { report } = reportData;
reportData.reset();

const report = getAndDeleteReport(executableSchemaId);
if (
Object.keys(report.tracesPerQuery).length === 0 &&
report.operationCount === 0
!report ||
(Object.keys(report.tracesPerQuery).length === 0 &&
report.operationCount === 0)
) {
return;
}
Expand Down Expand Up @@ -577,10 +584,12 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
treeBuilder.stopTiming();
const executableSchemaId =
overriddenExecutableSchemaId ?? executableSchemaIdForSchema(schema);
const reportData = getReportData(executableSchemaId);

if (includeOperationInUsageReporting === false) {
if (resolvedOperation) reportData.report.operationCount++;
if (resolvedOperation) {
getReportWhichMustBeUsedImmediately(executableSchemaId)
.operationCount++;
}
return;
}

Expand Down Expand Up @@ -634,8 +643,6 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
overriddenExecutableSchemaId ??
executableSchemaIdForSchema(schema);

const reportData = getReportData(executableSchemaId);
const { report } = reportData;
const { trace } = treeBuilder;

let statsReportKey: string | undefined = undefined;
Expand Down Expand Up @@ -673,9 +680,12 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
throw new Error(`Error encoding trace: ${protobufError}`);
}

if (resolvedOperation) report.operationCount++;
if (resolvedOperation) {
getReportWhichMustBeUsedImmediately(executableSchemaId)
.operationCount++;
}

report.addTrace({
getReportWhichMustBeUsedImmediately(executableSchemaId).addTrace({
statsReportKey,
trace,
// We include the operation as a trace (rather than aggregated
Expand All @@ -700,7 +710,8 @@ export function ApolloServerPluginUsageReporting<TContext extends BaseContext>(
// If the buffer gets big (according to our estimate), send.
if (
sendReportsImmediately ||
report.sizeEstimator.bytes >=
getReportWhichMustBeUsedImmediately(executableSchemaId)
.sizeEstimator.bytes >=
(options.maxUncompressedReportSize || 4 * 1024 * 1024)
) {
await sendReportAndReportErrors(executableSchemaId);
Expand Down